From 5883a1ad9483eba3871968d9b8c5dfd9c3db12c1 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Julius=20B=C3=BCnger?= Date: Thu, 22 Jan 2015 00:18:44 +0000 Subject: [PATCH] restructured service and sampler --- src/rps/gnunet-service-rps.c | 340 +++++++++++++++++++-------- src/rps/gnunet-service-rps_sampler.c | 171 ++++++-------- src/rps/gnunet-service-rps_sampler.h | 17 +- 3 files changed, 317 insertions(+), 211 deletions(-) diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 6caa77c40..9a7519728 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -70,22 +70,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; */ static struct GNUNET_PeerIdentity *own_identity; -/** - * Closure to the callback cadet calls on each peer it passes to us - */ -struct init_peer_cls -{ - /** - * The server handle to later listen to client requests - */ - struct GNUNET_SERVER_Handle *server; - - /** - * Counts how many peers cadet already passed to us - */ - uint32_t i; -}; - struct GNUNET_PeerIdentity * get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size); @@ -122,6 +106,29 @@ enum PeerFlags LIVING = 0x10 }; + +/** + * Functions of this type can be used to be stored at a peer for later execution. + */ +typedef void (* PeerOp) (void *cls, const struct GNUNET_PeerIdentity *peer); + +/** + * Outstanding operation on peer consisting of callback and closure + */ +struct PeerOutstandingOp +{ + /** + * Callback + */ + PeerOp op; + + /** + * Closure + */ + void *op_cls; +}; + + /** * Struct used to keep track of other peer's status * @@ -149,6 +156,17 @@ struct PeerContext */ struct GNUNET_CADET_Channel *recv_channel; // unneeded? + /** + * Array of outstanding operations on this peer. + */ + struct PeerOutstandingOp *outstanding_ops; + + /** + * Number of outstanding operations. + */ + unsigned int num_outstanding_ops; + //size_t num_outstanding_ops; + /** * This is pobably followed by 'statistical' data (when we first saw * him, how did we get his ID, how many pushes (in a timeinterval), @@ -310,6 +328,12 @@ static struct GNUNET_TIME_Relative request_deltas[REQUEST_DELTAS_SIZE]; static struct GNUNET_TIME_Relative request_rate; +/** + * Number of history update tasks. + */ +uint32_t num_hist_update_tasks; + + /*********************************************************************** * /Globals ***********************************************************************/ @@ -398,6 +422,8 @@ get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, ctx->mq = NULL; ctx->send_channel = NULL; ctx->recv_channel = NULL; + ctx->outstanding_ops = NULL; + ctx->num_outstanding_ops = 0; (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } @@ -405,6 +431,22 @@ get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, } +/** + * Put random peer from sampler into the gossip list as history update. + */ + void +hist_update (void *cls, struct GNUNET_PeerIdentity *ids, uint32_t num_peers) +{ + GNUNET_assert (1 == num_peers); + + if (gossip_list_size < sampler_size_est_need) + GNUNET_array_append (gossip_list, gossip_list_size, *ids); + + if (0 < num_hist_update_tasks) + num_hist_update_tasks--; +} + + /** * Callback that is called when a channel was effectively established. * This is given to ntfy_tmt_rdy and called when the channel was @@ -422,6 +464,15 @@ peer_is_live (void *cls, size_t size, void *buf) LOG (GNUNET_ERROR_TYPE_DEBUG, "Peer %s is live\n", GNUNET_i2s (peer)); + if (0 != peer_ctx->num_outstanding_ops) + { /* Call outstanding operations */ + unsigned int i; + + for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ ) + peer_ctx->outstanding_ops[i].op (peer_ctx->outstanding_ops[i].op_cls, peer); + GNUNET_array_grow (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, 0); + } + GNUNET_free (peer); buf = NULL; @@ -437,7 +488,7 @@ get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct GNUNET_PeerIdentity *peer) { struct PeerContext *ctx; - //struct GNUNET_PeerIdentity *tmp_peer; + struct GNUNET_PeerIdentity *tmp_peer; ctx = get_peer_ctx (peer_map, peer); if (NULL == ctx->send_channel) @@ -446,11 +497,14 @@ get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, GNUNET_RPS_CADET_PORT, GNUNET_CADET_OPTION_RELIABLE); - //tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity); - //*tmp_peer = *peer; - //(void) GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO, - // GNUNET_TIME_UNIT_FOREVER_REL, - // 0, peer_is_live, tmp_peer); + if (NULL == ctx->recv_channel) + { + tmp_peer = GNUNET_new (struct GNUNET_PeerIdentity); + *tmp_peer = *peer; + (void) GNUNET_CADET_notify_transmit_ready (ctx->send_channel, GNUNET_NO, + GNUNET_TIME_UNIT_FOREVER_REL, + 0, peer_is_live, tmp_peer); + } // do I have to explicitly put it in the peer_map? (void) GNUNET_CONTAINER_multipeermap_put (peer_map, peer, ctx, @@ -509,16 +563,91 @@ T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size) struct GNUNET_TIME_Relative T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint32_t arr_size) { - return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size); // FIXME find a way to devide that by arr_size + return GNUNET_TIME_relative_divide (T_relative_sum (rel_array, arr_size), arr_size); +} + + +/** + * Insert PeerID in #pull_list + * + * Called once we know a peer is live. + */ + void +insert_in_pull_list (void *cls, const struct GNUNET_PeerIdentity *peer) +{ + if (GNUNET_NO == in_arr (pull_list, pull_list_size, peer)) + GNUNET_array_append (pull_list, pull_list_size, *peer); +} + +/** + * Check whether #insert_in_pull_list was already scheduled + */ + int +insert_in_pull_list_scheduled (const struct PeerContext *peer_ctx) +{ + unsigned int i; + + for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ ) + if (insert_in_pull_list == peer_ctx->outstanding_ops[i].op) + return GNUNET_YES; + return GNUNET_NO; +} + + +/** + * Insert PeerID in #gossip_list + * + * Called once we know a peer is live. + */ + void +insert_in_gossip_list (void *cls, const struct GNUNET_PeerIdentity *peer) +{ + if (GNUNET_NO == in_arr (gossip_list, gossip_list_size, peer)) + GNUNET_array_append (gossip_list, gossip_list_size, *peer); +} + +/** + * Check whether #insert_in_pull_list was already scheduled + */ + int +insert_in_gossip_list_scheduled (const struct PeerContext *peer_ctx) +{ + unsigned int i; + + for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ ) + if (insert_in_gossip_list == peer_ctx->outstanding_ops[i].op) + return GNUNET_YES; + return GNUNET_NO; +} + + +/** + * Update sampler with given PeerID. + */ + void +insert_in_sampler (void *cls, const struct GNUNET_PeerIdentity *peer) +{ + RPS_sampler_update_list (peer); +} + +/** + * Check whether #insert_in_sampler was already scheduled + */ + int +insert_in_sampler_scheduled (const struct PeerContext *peer_ctx) +{ + unsigned int i; + + for ( i = 0 ; i < peer_ctx->num_outstanding_ops ; i++ ) + if (insert_in_sampler== peer_ctx->outstanding_ops[i].op) + return GNUNET_YES; + return GNUNET_NO; } -/*********************************************************************** - * /Util functions -***********************************************************************/ /** - * Wrapper around _sampler_resize() + * Wrapper around #RPS_sampler_resize() */ void resize_wrapper () @@ -544,6 +673,10 @@ resize_wrapper () } +/*********************************************************************** + * /Util functions +***********************************************************************/ + /** * Function called by NSE. * @@ -660,7 +793,7 @@ handle_client_request (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested %" PRIX32 " random peer(s).\n", num_peers); - RPS_sampler_get_n_rand_peers (client_respond, client, num_peers); + RPS_sampler_get_n_rand_peers (client_respond, client, num_peers, GNUNET_YES); GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -806,6 +939,8 @@ handle_peer_pull_reply (void *cls, struct GNUNET_RPS_P2P_PullReplyMessage *in_msg; struct GNUNET_PeerIdentity *peers; + struct PeerContext *peer_ctx; + struct PeerOutstandingOp out_op; uint32_t i; if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) > ntohs (msg->size)) @@ -828,8 +963,19 @@ handle_peer_pull_reply (void *cls, peers = (struct GNUNET_PeerIdentity *) &msg[1]; for ( i = 0 ; i < ntohl (in_msg->num_peers) ; i++ ) { - if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i])) - GNUNET_array_append (pull_list, pull_list_size, peers[i]); + peer_ctx = get_peer_ctx (peer_map, &peers[i]); + + if ((0 != (peer_ctx->peer_flags && LIVING)) || + NULL != peer_ctx->recv_channel) + { + if (GNUNET_NO == in_arr (pull_list, pull_list_size, &peers[i])) + GNUNET_array_append (pull_list, pull_list_size, peers[i]); + } + else if (GNUNET_NO == insert_in_pull_list_scheduled (peer_ctx)) + { + out_op.op = insert_in_pull_list; + GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op); + } } // TODO check that id is valid - whether it is reachable @@ -865,44 +1011,50 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /* Send PUSHes */ //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) gossip_list_size); - n_peers = round (alpha * gossip_list_size); - if (0 == n_peers) - n_peers = 1; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) peers.\n", - n_peers, alpha, gossip_list_size); - for ( i = 0 ; i < n_peers ; i++ ) + if (0 != gossip_list_size) { - peer = get_rand_peer (gossip_list, gossip_list_size); - if (own_identity != peer) - { // FIXME if this fails schedule/loop this for later - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s (peer)); - - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); - // FIXME sometimes it returns a pointer to a freed mq - mq = get_mq (peer_map, peer); - GNUNET_MQ_send (mq, ev); + n_peers = round (alpha * gossip_list_size); + if (0 == n_peers) + n_peers = 1; + LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pushes to %u (%f * %u) peers.\n", + n_peers, alpha, gossip_list_size); + for ( i = 0 ; i < n_peers ; i++ ) + { + peer = get_rand_peer (gossip_list, gossip_list_size); + if (own_identity != peer) + { // FIXME if this fails schedule/loop this for later + LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PUSH to peer %s of gossiped list.\n", GNUNET_i2s (peer)); + + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PUSH); + // FIXME sometimes it returns a pointer to a freed mq + mq = get_mq (peer_map, peer); + GNUNET_MQ_send (mq, ev); + } } } /* Send PULL requests */ //n_arr = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG, (unsigned int) sampler_list->size); - n_peers = round (beta * gossip_list_size); - if (0 == n_peers) - n_peers = 1; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n", - n_peers, beta, gossip_list_size); - for ( i = 0 ; i < n_peers ; i++ ) + if (0 != gossip_list_size) { - peer = get_rand_peer (gossip_list, gossip_list_size); - if (own_identity != peer) - { // FIXME if this fails schedule/loop this for later - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s (peer)); - - ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); - //pull_msg = NULL; - mq = get_mq (peer_map, peer); - GNUNET_MQ_send (mq, ev); + n_peers = round (beta * gossip_list_size); + if (0 == n_peers) + n_peers = 1; + LOG (GNUNET_ERROR_TYPE_DEBUG, "Going to send pulls to %u (%f * %u) peers.\n", + n_peers, beta, gossip_list_size); + for ( i = 0 ; i < n_peers ; i++ ) + { + peer = get_rand_peer (gossip_list, gossip_list_size); + if (own_identity != peer) + { // FIXME if this fails schedule/loop this for later + LOG (GNUNET_ERROR_TYPE_DEBUG, "Sending PULL request to peer %s of gossiped list.\n", GNUNET_i2s (peer)); + + ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST); + //pull_msg = NULL; + mq = get_mq (peer_map, peer); + GNUNET_MQ_send (mq, ev); + } } } @@ -914,14 +1066,16 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) push_list_size != 0 && pull_list_size != 0 ) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list.\n"); uint32_t first_border; uint32_t second_border; - GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need); + first_border = round (alpha * sampler_size_est_need); + second_border = first_border + round (beta * sampler_size_est_need); + + GNUNET_array_grow (gossip_list, gossip_list_size, second_border); - first_border = round (alpha * gossip_list_size); for ( i = 0 ; i < first_border ; i++ ) { // TODO use RPS_sampler_get_n_rand_peers /* Update gossip list with peers received through PUSHes */ @@ -931,7 +1085,6 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) // TODO change the peer_flags accordingly } - second_border = first_border + round (beta * gossip_list_size); for ( i = first_border ; i < second_border ; i++ ) { /* Update gossip list with peers received through PULLs */ @@ -944,8 +1097,8 @@ do_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) for ( i = second_border ; i < gossip_list_size ; i++ ) { /* Update gossip list with peers from history */ - peer = RPS_sampler_get_n_rand_peers_ (1); - gossip_list[i] = *peer; + RPS_sampler_get_n_rand_peers (hist_update, NULL, 1, GNUNET_NO); + num_hist_update_tasks++; // TODO change the peer_flags accordingly } @@ -1058,37 +1211,38 @@ init_peer_cb (void *cls, unsigned int best_path) // "How long is the best path? // (0 = unknown, 1 = ourselves, 2 = neighbor)" { - struct init_peer_cls *ipc; + struct GNUNET_SERVER_Handle *server; + struct PeerOutstandingOp out_op; + struct PeerContext *peer_ctx; - ipc = (struct init_peer_cls *) cls; + server = (struct GNUNET_SERVER_Handle *) cls; if ( NULL != peer ) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "Got %" PRIX32 ". peer %s (at %p) from CADET (gossip_list_size: %u)\n", - ipc->i, GNUNET_i2s (peer), peer, gossip_list_size); - RPS_sampler_update_list (peer); - (void) get_peer_ctx (peer_map, peer); // unneeded? -> insertCB + "Got peer %s (at %p) from CADET (gossip_list_size: %u)\n", + GNUNET_i2s (peer), peer, gossip_list_size); - if (ipc->i < gossip_list_size) + // maybe create a function for that + peer_ctx = get_peer_ctx (peer_map, peer); + if (GNUNET_NO == insert_in_sampler_scheduled (peer_ctx)) { - gossip_list[ipc->i] = *peer; // FIXME sometimes we're writing to invalid space here - // not sure whether fixed - ipc->i++; + out_op.op = insert_in_sampler; + GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op); } - // send push/pull to each of those peers? - } - else - { - if (ipc->i < gossip_list_size) + if (GNUNET_NO == insert_in_gossip_list_scheduled (peer_ctx)) { - memcpy (&gossip_list[ipc->i], - RPS_sampler_get_n_rand_peers_ (1), - (gossip_list_size - ipc->i) * sizeof (struct GNUNET_PeerIdentity)); + out_op.op = insert_in_gossip_list; + GNUNET_array_append (peer_ctx->outstanding_ops, peer_ctx->num_outstanding_ops, out_op); } - rps_start (ipc->server); - GNUNET_free (ipc); + + /* Issue livelyness test on peer */ + (void) get_channel (peer_map, peer); + + // send push/pull to each of those peers? } + else + rps_start (server); } @@ -1236,6 +1390,7 @@ cleanup_channel (void *cls, (void) peer_remove_cb (peer, peer, peer_ctx); } + /** * Actually start the service. */ @@ -1256,6 +1411,8 @@ rps_start (struct GNUNET_SERVER_Handle *server) LOG (GNUNET_ERROR_TYPE_DEBUG, "Ready to receive requests from clients\n"); + num_hist_update_tasks = 0; + do_round_task = GNUNET_SCHEDULER_add_now (&do_round, NULL); LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n"); @@ -1283,7 +1440,6 @@ run (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "RPS started\n"); - struct init_peer_cls *ipc; cfg = c; @@ -1316,16 +1472,13 @@ run (void *cls, } LOG (GNUNET_ERROR_TYPE_DEBUG, "INITSIZE is %" PRIu64 "\n", sampler_size_est_need); - //gossip_list_size = sampler_size; // TODO rename sampler_size gossip_list = NULL; - GNUNET_array_grow (gossip_list, gossip_list_size, sampler_size_est_need); /* connect to NSE */ nse = GNUNET_NSE_connect (cfg, nse_callback, NULL); // TODO check whether that was successful - // TODO disconnect on shutdown LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to NSE\n"); @@ -1383,7 +1536,7 @@ run (void *cls, half_round_interval = GNUNET_TIME_relative_multiply (round_interval, .5); max_round_interval = GNUNET_TIME_relative_add (round_interval, half_round_interval); - RPS_sampler_init (sampler_size_est_need, own_identity, max_round_interval, + RPS_sampler_init (sampler_size_est_need, max_round_interval, insertCB, NULL, removeCB, NULL); sampler_size = sampler_size_est_need; @@ -1394,11 +1547,8 @@ run (void *cls, pull_list_size = 0; - ipc = GNUNET_new (struct init_peer_cls); - ipc->server = server; - ipc->i = 0; LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n"); - GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, ipc); + GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, server); // TODO send push/pull to each of those peers? } diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index 85d8d532b..b2ee5fb21 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c @@ -151,7 +151,7 @@ struct RPS_Sampler /** * Closure to _get_n_rand_peers_ready_cb() */ -struct RPS_GetNRandPeersReadyCls +struct NRandPeersReadyCls { /** * Number of peers we are waiting for. @@ -255,15 +255,15 @@ static uint32_t client_get_index; * give those back. */ void -RPS_sampler_get_n_rand_peers_ready_cb (void *cls, +check_n_peers_ready (void *cls, const struct GNUNET_PeerIdentity *id) { - struct RPS_GetNRandPeersReadyCls *n_peers_cls; + struct NRandPeersReadyCls *n_peers_cls; - n_peers_cls = (struct RPS_GetNRandPeersReadyCls *) cls; + n_peers_cls = (struct NRandPeersReadyCls *) cls; LOG (GNUNET_ERROR_TYPE_DEBUG, - "SAMPLER: Got %" PRIX32 "th of %" PRIX32 " peers\n", + "SAMPLER: Got %" PRIX32 ". of %" PRIX32 " peers\n", n_peers_cls->cur_num_peers, n_peers_cls->num_peers); if (n_peers_cls->num_peers - 1 == n_peers_cls->cur_num_peers) @@ -297,12 +297,6 @@ RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el) sampler_el->last_client_request = GNUNET_TIME_UNIT_FOREVER_ABS; - /* We might want to keep the previous peer */ - - //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id, - // sizeof(struct GNUNET_PeerIdentity), - // &sampler_el->peer_id_hash); - sampler_el->birth = GNUNET_TIME_absolute_get (); sampler_el->num_peers = 0; sampler_el->num_change = 0; @@ -479,7 +473,6 @@ RPS_sampler_resize (unsigned int new_size) * Initialise a tuple of sampler elements. * * @param init_size the size the sampler is initialised with - * @param id with which all newly created sampler elements are initialised * @param ins_cb the callback that will be called on every PeerID that is * newly inserted into a sampler element * @param ins_cls the closure given to #ins_cb @@ -489,7 +482,6 @@ RPS_sampler_resize (unsigned int new_size) */ void RPS_sampler_init (size_t init_size, - const struct GNUNET_PeerIdentity *id, struct GNUNET_TIME_Relative max_round_interval, RPS_sampler_insert_cb ins_cb, void *ins_cls, RPS_sampler_remove_cb rem_cb, void *rem_cls) @@ -513,7 +505,6 @@ RPS_sampler_init (size_t init_size, //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); RPS_sampler_resize (init_size); - RPS_sampler_update_list (id); // no super nice desing but ok for the moment client_get_index = 0; @@ -568,13 +559,14 @@ RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id) * corrsponding peer to the client. * Only used internally */ - const struct GNUNET_PeerIdentity * -RPS_sampler_get_rand_peer_ () + void +RPS_sampler_get_rand_peer_ (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct GetPeerCls *gpc; uint32_t r_index; - const struct GNUNET_PeerIdentity *peer; // do we have to malloc that? + struct GNUNET_HashCode *hash; - // TODO implement extra logic + gpc = (struct GetPeerCls *) cls; /**; * Choose the r_index of the peer we want to return @@ -583,50 +575,25 @@ RPS_sampler_get_rand_peer_ () r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, sampler->sampler_size); - //if ( EMPTY == sampler->sampler_elements[r_index]->is_empty ) - // // TODO schedule for later - // peer = NULL; - //else - peer = &(sampler->sampler_elements[r_index]->peer_id); - //sampler->sampler_elements[r_index]->last_client_request = GNUNET_TIME_absolute_get(); - LOG (GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", GNUNET_i2s(peer)); - - return peer; -} - - -/** - * Get n random peers out of the sampled peers. - * - * We might want to reinitialise this sampler after giving the - * corrsponding peer to the client. - * Random with or without consumption? - * Only used internally - */ - const struct GNUNET_PeerIdentity * -RPS_sampler_get_n_rand_peers_ (uint32_t n) -{ - if ( 0 == sampler->sampler_size ) + if ( EMPTY == sampler->sampler_elements[r_index]->is_empty ) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sgrp: List empty - Returning NULL\n"); - return NULL; + gpc->get_peer_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply( + GNUNET_TIME_UNIT_SECONDS, + .1), + &RPS_sampler_get_rand_peer_, + cls); + return; } - else - { - // TODO check if we have too much (distinct) sampled peers - // If we are not ready yet maybe schedule for later - struct GNUNET_PeerIdentity *peers; - uint32_t i; - peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity)); + *gpc->id = sampler->sampler_elements[r_index]->peer_id; - for ( i = 0 ; i < n ; i++ ) { - //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements); - memcpy (&peers[i], RPS_sampler_get_rand_peer_ (), sizeof (struct GNUNET_PeerIdentity)); - } - return peers; - } + hash = GNUNET_new (struct GNUNET_HashCode); + GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); + if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove (get_peer_tasks, hash, &gpc->get_peer_task)) + LOG (GNUNET_ERROR_TYPE_WARNING, "SAMPLER: Key to remove is not in the hashmap\n"); + GNUNET_free (gpc->get_peer_task); + + gpc->cb (gpc->cb_cls, gpc->id); } @@ -639,28 +606,28 @@ RPS_sampler_get_n_rand_peers_ (uint32_t n) * @return a random PeerID of the PeerIDs previously put into the sampler. */ void -//RPS_sampler_get_rand_peer (RPS_sampler_rand_peer_ready_cb cb, -// void *cls, struct GNUNET_PeerIdentity *id) RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GetPeerCls *gpc; + struct GNUNET_PeerIdentity tmp_id; struct RPS_SamplerElement *s_elem; struct GNUNET_TIME_Relative last_request_diff; struct GNUNET_HashCode *hash; uint32_t tmp_client_get_index; - //struct GNUNET_TIME_Relative inv_last_request_diff; LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Single peer was requested\n"); gpc = (struct GetPeerCls *) cls; hash = GNUNET_new (struct GNUNET_HashCode); + + /* Store the next #client_get_index to check whether we cycled over the whole list */ if (0 < client_get_index) tmp_client_get_index = client_get_index - 1; else tmp_client_get_index = sampler->sampler_size - 1; LOG (GNUNET_ERROR_TYPE_DEBUG, - "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ".\n", + "SAMPLER: scheduling for later if index reaches %" PRIX32 " (sampler size: %" PRIX32 ").\n", tmp_client_get_index, sampler->sampler_size); do @@ -674,17 +641,22 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext return; } - *gpc->id = sampler->sampler_elements[client_get_index]->peer_id; - + tmp_id = sampler->sampler_elements[client_get_index]->peer_id; RPS_sampler_elem_reinit (sampler->sampler_elements[client_get_index]); + RPS_sampler_elem_next (sampler->sampler_elements[client_get_index], &tmp_id, + NULL, NULL, NULL, NULL); + + /* Cycle the #client_get_index one step further */ if ( client_get_index == sampler->sampler_size - 1 ) client_get_index = 0; else client_get_index++; + LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: incremented index to %" PRIX32 ".\n", client_get_index); } while (EMPTY == sampler->sampler_elements[client_get_index]->is_empty); s_elem = sampler->sampler_elements[client_get_index]; + *gpc->id = s_elem->peer_id; /* Check whether we may use this sampler to give it back to the client */ if (GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us != s_elem->last_client_request.abs_value_us) @@ -729,54 +701,49 @@ RPS_sampler_get_rand_peer (void *cls, const struct GNUNET_SCHEDULER_TaskContext * * @param cb callback that will be called once the ids are ready. * @param cls closure given to @a cb + * @param for_client #GNUNET_YES if result is used for client, + * #GNUNET_NO if used internally * @param num_peers the number of peers requested */ void RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, - void *cls, uint32_t num_peers) + void *cls, uint32_t num_peers, int for_client) { - if ( 0 == sampler->sampler_size ) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sgrp: List empty - Returning NULL\n"); - cb (cls, NULL, 0); - } - else - { - // TODO check if we have too much (distinct) sampled peers - // If we are not ready yet maybe schedule for later - uint32_t i; - struct RPS_GetNRandPeersReadyCls *cb_cls; - struct GetPeerCls *gpc; - struct GNUNET_HashCode *hash; - - hash = GNUNET_new (struct GNUNET_HashCode); + GNUNET_assert (0 != sampler->sampler_size); - cb_cls = GNUNET_new (struct RPS_GetNRandPeersReadyCls); - cb_cls->num_peers = num_peers; - cb_cls->cur_num_peers = 0; - cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); - cb_cls->callback = cb; - cb_cls->cls = cls; + // TODO check if we have too much (distinct) sampled peers + uint32_t i; + struct NRandPeersReadyCls *cb_cls; + struct GetPeerCls *gpc; + struct GNUNET_HashCode *hash; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers); + hash = GNUNET_new (struct GNUNET_HashCode); - for ( i = 0 ; i < num_peers ; i++ ) - { - gpc = GNUNET_new (struct GetPeerCls); - gpc->cb = RPS_sampler_get_n_rand_peers_ready_cb; - gpc->cb_cls = cb_cls; - gpc->id = &cb_cls->ids[i]; + cb_cls = GNUNET_new (struct NRandPeersReadyCls); + cb_cls->num_peers = num_peers; + cb_cls->cur_num_peers = 0; + cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); + cb_cls->callback = cb; + cb_cls->cls = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "SAMPLER: Scheduling requests for %" PRIX32 " peers\n", num_peers); - // maybe add a little delay + for ( i = 0 ; i < num_peers ; i++ ) + { + gpc = GNUNET_new (struct GetPeerCls); + gpc->cb = check_n_peers_ready; + gpc->cb_cls = cb_cls; + gpc->id = &cb_cls->ids[i]; + + // maybe add a little delay + if (GNUNET_YES == for_client) gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer, gpc); - GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); - (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - //RPS_sampler_get_rand_peer (RPS_sampler_get_n_rand_peers_ready_cb, - // cb_cls, &peers[i]); - } + else if (GNUNET_NO == for_client) + gpc->get_peer_task = GNUNET_SCHEDULER_add_now (&RPS_sampler_get_rand_peer_, gpc); + GNUNET_CRYPTO_hash (&gpc->get_peer_task, sizeof (struct GNUNET_SCHEDULER_Task *), hash); + (void) GNUNET_CONTAINER_multihashmap_put (get_peer_tasks, hash, &gpc->get_peer_task, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); } } diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h index 451d3cdb0..a7021b4fe 100644 --- a/src/rps/gnunet-service-rps_sampler.h +++ b/src/rps/gnunet-service-rps_sampler.h @@ -89,7 +89,6 @@ RPS_sampler_resize (unsigned int new_size); */ void RPS_sampler_init (size_t init_size, - const struct GNUNET_PeerIdentity *id, struct GNUNET_TIME_Relative max_round_interval, RPS_sampler_insert_cb ins_cb, void *ins_cls, RPS_sampler_remove_cb rem_cb, void *rem_cls); @@ -115,18 +114,6 @@ RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id); RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id); -/** - * Get n random peers out of the sampled peers. - * - * We might want to reinitialise this sampler after giving the - * corrsponding peer to the client. - * Random with or without consumption? - * Only used internally - */ - const struct GNUNET_PeerIdentity * -RPS_sampler_get_n_rand_peers_ (uint32_t n); - - /** * Get n random peers out of the sampled peers. * @@ -136,11 +123,13 @@ RPS_sampler_get_n_rand_peers_ (uint32_t n); * * @param cb callback that will be called once the ids are ready. * @param cls closure given to @a cb + * @param for_client #GNUNET_YES if result is used for client, + * #GNUNET_NO if used internally * @param num_peers the number of peers requested */ void RPS_sampler_get_n_rand_peers (RPS_sampler_n_rand_peers_ready_cb cb, - void *cls, uint32_t num_peers); + void *cls, uint32_t num_peers, int for_client); /** -- 2.25.1