From 3afe1a3435697b01fee557420a701fba1821dbe5 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Julius=20B=C3=BCnger?= Date: Mon, 8 Apr 2019 00:55:35 +0200 Subject: [PATCH] RPS: Retrieve more info from sampler for profiling --- src/rps/gnunet-rps-profiler.c | 91 ++++++++++++- src/rps/gnunet-service-rps_sampler.c | 2 +- src/rps/rps-sampler_client.c | 64 ++++++++- src/rps/rps-sampler_client.h | 8 +- src/rps/rps-sampler_common.c | 187 +++++++++++++++++++++++++- src/rps/rps-sampler_common.h | 56 +++++++- src/rps/rps_api.c | 193 ++++++++++++++++++++++++++- 7 files changed, 576 insertions(+), 25 deletions(-) diff --git a/src/rps/gnunet-rps-profiler.c b/src/rps/gnunet-rps-profiler.c index a852d94b1..ffc9d6f7e 100644 --- a/src/rps/gnunet-rps-profiler.c +++ b/src/rps/gnunet-rps-profiler.c @@ -429,7 +429,7 @@ struct PendingReply /** * Handle to the request we are waiting for */ - struct GNUNET_RPS_Request_Handle *req_handle; + struct GNUNET_RPS_Request_Handle_Single_Info *req_handle; /** * The peer that requested @@ -1040,7 +1040,7 @@ cancel_request (struct PendingReply *pending_rep) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Cancelling rps get reply\n"); GNUNET_assert (NULL != pending_rep->req_handle); - GNUNET_RPS_request_cancel (pending_rep->req_handle); + GNUNET_RPS_request_single_info_cancel (pending_rep->req_handle); pending_rep->req_handle = NULL; GNUNET_free (pending_rep); pending_rep = NULL; @@ -1489,6 +1489,13 @@ default_reply_handle (void *cls, } } + +static void +profiler_reply_handle_info (void *cls, + const struct GNUNET_PeerIdentity *recv_peer, + double probability, + uint32_t num_observed); + /** * Request random peers. */ @@ -1510,9 +1517,12 @@ request_peers (void *cls) "Requesting one peer\n"); pending_rep = GNUNET_new (struct PendingReply); pending_rep->rps_peer = rps_peer; - pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle, - 1, - cur_test_run.reply_handle, + //pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle, + // 1, + // cur_test_run.reply_handle, + // pending_rep); + pending_rep->req_handle = GNUNET_RPS_request_peer_info (rps_peer->rps_handle, + profiler_reply_handle_info, pending_rep); GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head, rps_peer->pending_rep_tail, @@ -1979,6 +1989,77 @@ profiler_reply_handle (void *cls, } +/** + * Callback to call on receipt of a reply + * + * @param cls closure + * @param n number of peers + * @param recv_peers the received peers + */ +static void +profiler_reply_handle_info (void *cls, + const struct GNUNET_PeerIdentity *recv_peer, + double probability, + uint32_t num_observed) +{ + struct RPSPeer *rps_peer; + struct RPSPeer *rcv_rps_peer; + char file_name_buf[128]; + char file_name_dh_buf[128]; + char file_name_dhr_buf[128]; + char file_name_dhru_buf[128]; + char *file_name = file_name_buf; + char *file_name_dh = file_name_dh_buf; + char *file_name_dhr = file_name_dhr_buf; + char *file_name_dhru = file_name_dhru_buf; + unsigned int i; + struct PendingReply *pending_rep = (struct PendingReply *) cls; + + pending_rep->req_handle = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "profiler_reply_handle()\n"); + rps_peer = pending_rep->rps_peer; + (void) GNUNET_asprintf (&file_name, + "/tmp/rps/received_ids-%u", + rps_peer->index); + + (void) GNUNET_asprintf (&file_name_dh, + "/tmp/rps/diehard_input-%u", + rps_peer->index); + (void) GNUNET_asprintf (&file_name_dhr, + "/tmp/rps/diehard_input_raw-%u", + rps_peer->index); + (void) GNUNET_asprintf (&file_name_dhru, + "/tmp/rps/diehard_input_raw_aligned-%u", + rps_peer->index); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "[%s] got peer with info:\n", + GNUNET_i2s (rps_peer->peer_id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " %s\n", + GNUNET_i2s (recv_peer)); + tofile (file_name, + "%s %d %" PRIu32 " \n", + GNUNET_i2s_full (recv_peer), + probability, + num_observed); + rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, recv_peer); + GNUNET_assert (NULL != rcv_rps_peer); + tofile (file_name_dh, + "%" PRIu32 "\n", + (uint32_t) rcv_rps_peer->index); +#ifdef TO_FILE + to_file_raw (file_name_dhr, + (char *) &rcv_rps_peer->index, + sizeof (uint32_t)); + to_file_raw_unaligned (file_name_dhru, + (char *) &rcv_rps_peer->index, + sizeof (uint32_t), + bits_needed); +#endif /* TO_FILE */ + default_reply_handle (cls, 1, recv_peer); +} + + static void profiler_cb (struct RPSPeer *rps_peer) { diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index a95ac82d4..e17b154ca 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c @@ -257,7 +257,7 @@ sampler_get_rand_peer (void *cls) gpc->req_handle->gpc_tail, gpc); *gpc->id = sampler->sampler_elements[r_index]->peer_id; - gpc->cont (gpc->cont_cls, gpc->id); + gpc->cont (gpc->cont_cls, gpc->id, 0, sampler->sampler_elements[r_index]->num_peers); GNUNET_free (gpc); } diff --git a/src/rps/rps-sampler_client.c b/src/rps/rps-sampler_client.c index 0de25df07..61f9b6385 100644 --- a/src/rps/rps-sampler_client.c +++ b/src/rps/rps-sampler_client.c @@ -158,6 +158,46 @@ struct RPS_SamplerRequestHandle void *cls; }; + +/** + * Closure to _get_rand_peer_info() + */ +struct RPS_SamplerRequestHandleSingleInfo +{ + /** + * DLL + */ + struct RPS_SamplerRequestHandleSingleInfo *next; + struct RPS_SamplerRequestHandleSingleInfo *prev; + + /** + * Pointer to the id + */ + struct GNUNET_PeerIdentity *id; + + /** + * Head and tail for the DLL to store the tasks for single requests + */ + struct GetPeerCls *gpc_head; + struct GetPeerCls *gpc_tail; + + /** + * Sampler. + */ + struct RPS_Sampler *sampler; + + /** + * Callback to be called when all ids are available. + */ + RPS_sampler_sinlge_info_ready_cb callback; + + /** + * Closure given to the callback + */ + void *cls; +}; + + ///** // * Global sampler variable. // */ @@ -269,7 +309,12 @@ sampler_mod_get_rand_peer (void *cls) gpc->get_peer_task = NULL; gpc->notify_ctx = NULL; - sampler = gpc->req_handle->sampler; + GNUNET_assert ( (NULL != gpc->req_handle) || + (NULL != gpc->req_single_info_handle) ); + if (NULL != gpc->req_handle) + sampler = gpc->req_handle->sampler; + else + sampler = gpc->req_single_info_handle->sampler; LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); @@ -362,10 +407,19 @@ sampler_mod_get_rand_peer (void *cls) RPS_sampler_elem_reinit (s_elem); s_elem->last_client_request = GNUNET_TIME_absolute_get (); - GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, - gpc->req_handle->gpc_tail, - gpc); - gpc->cont (gpc->cont_cls, gpc->id); + if (NULL != gpc->req_handle) + { + GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, + gpc->req_handle->gpc_tail, + gpc); + } + else + { + GNUNET_CONTAINER_DLL_remove (gpc->req_single_info_handle->gpc_head, + gpc->req_single_info_handle->gpc_tail, + gpc); + } + gpc->cont (gpc->cont_cls, gpc->id, prob_observed_n, s_elem->num_peers); GNUNET_free (gpc); } diff --git a/src/rps/rps-sampler_client.h b/src/rps/rps-sampler_client.h index 1b425b754..680fabfda 100644 --- a/src/rps/rps-sampler_client.h +++ b/src/rps/rps-sampler_client.h @@ -40,6 +40,11 @@ struct RPS_Sampler; */ struct RPS_SamplerRequestHandle; +/** + * Closure to _get_rand_peer_info() + */ +struct RPS_SamplerRequestHandleSingleInfo; + /** * Get the size of the sampler. @@ -108,8 +113,6 @@ RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler, * @param sampler the sampler to get peers from. * @param cb callback that will be called once the ids are ready. * @param cls closure given to @a cb - * @param for_client #GNUNET_YES if result is used for client, - * #GNUNET_NO if used internally * @param num_peers the number of peers requested */ struct RPS_SamplerRequestHandle * @@ -118,6 +121,7 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, RPS_sampler_n_rand_peers_ready_cb cb, void *cls); + /** * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. * diff --git a/src/rps/rps-sampler_common.c b/src/rps/rps-sampler_common.c index f54de9014..adb69e1b5 100644 --- a/src/rps/rps-sampler_common.c +++ b/src/rps/rps-sampler_common.c @@ -115,6 +115,45 @@ struct RPS_SamplerRequestHandle }; +/** + * Closure to _get_rand_peer_info() + */ +struct RPS_SamplerRequestHandleSingleInfo +{ + /** + * DLL + */ + struct RPS_SamplerRequestHandleSingleInfo *next; + struct RPS_SamplerRequestHandleSingleInfo *prev; + + /** + * Pointer to the id + */ + struct GNUNET_PeerIdentity *id; + + /** + * Head and tail for the DLL to store the tasks for single requests + */ + struct GetPeerCls *gpc_head; + struct GetPeerCls *gpc_tail; + + /** + * Sampler. + */ + struct RPS_Sampler *sampler; + + /** + * Callback to be called when all ids are available. + */ + RPS_sampler_sinlge_info_ready_cb callback; + + /** + * Closure given to the callback + */ + void *cls; +}; + + /** * @brief Update the current estimate of the network size stored at the sampler * @@ -415,12 +454,20 @@ sampler_empty (struct RPS_Sampler *sampler) /** * Callback to _get_rand_peer() used by _get_n_rand_peers(). * + * Implements #RPS_sampler_rand_peer_ready_cont + * * Checks whether all n peers are available. If they are, * give those back. + * @param cls Closure + * @param id Peer ID + * @param probability The probability with which this sampler has seen all ids + * @param num_observed How many ids this sampler has observed */ static void check_n_peers_ready (void *cls, - const struct GNUNET_PeerIdentity *id) + const struct GNUNET_PeerIdentity *id, + double probability, + uint32_t num_observed) { struct RPS_SamplerRequestHandle *req_handle = cls; (void) id; @@ -428,6 +475,8 @@ check_n_peers_ready (void *cls, struct GNUNET_PeerIdentity *peers; uint32_t num_peers; void *cb_cls; + (void) probability; + (void) num_observed; req_handle->cur_num_peers++; LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -459,6 +508,53 @@ check_n_peers_ready (void *cls, } +/** + * Callback to _get_rand_peer() used by _get_rand_peer_info(). + * + * Implements #RPS_sampler_rand_peer_ready_cont + * + * @param cls Closure + * @param id Peer ID + * @param probability The probability with which this sampler has seen all ids + * @param num_observed How many ids this sampler has observed + */ +static void +check_peer_info_ready (void *cls, + const struct GNUNET_PeerIdentity *id, + double probability, + uint32_t num_observed) +{ + struct RPS_SamplerRequestHandleSingleInfo *req_handle = cls; + (void) id; + RPS_sampler_sinlge_info_ready_cb tmp_cb; + struct GNUNET_PeerIdentity *peer; + void *cb_cls; + (void) probability; + (void) num_observed; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got single peer with additional info\n"); + + GNUNET_assert (NULL != req_handle->callback); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "returning single peer with info to the client\n"); + + /* Copy pointers and peers temporarily as they + * might be deleted from within the callback */ + tmp_cb = req_handle->callback; + peer = GNUNET_new (struct GNUNET_PeerIdentity); + GNUNET_memcpy (peer, + req_handle->id, + sizeof (struct GNUNET_PeerIdentity)); + cb_cls = req_handle->cls; + RPS_sampler_request_single_info_cancel (req_handle); + req_handle = NULL; + tmp_cb (peer, cb_cls, probability, num_observed); + GNUNET_free (peer); +} + + /** * Get n random peers out of the sampled peers. * @@ -469,8 +565,6 @@ check_n_peers_ready (void *cls, * @param sampler the sampler to get peers from. * @param cb callback that will be called once the ids are ready. * @param cls closure given to @a cb - * @param for_client #GNUNET_YES if result is used for client, - * #GNUNET_NO if used internally * @param num_peers the number of peers requested */ struct RPS_SamplerRequestHandle * @@ -506,6 +600,7 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, { gpc = GNUNET_new (struct GetPeerCls); gpc->req_handle = req_handle; + gpc->req_single_info_handle = NULL; gpc->cont = check_n_peers_ready; gpc->cont_cls = req_handle; gpc->id = &req_handle->ids[i]; @@ -515,11 +610,56 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, gpc); // maybe add a little delay gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, - gpc); + gpc); } return req_handle; } + +/** + * Get one random peer with additional information. + * + * @param sampler the sampler to get peers from. + * @param cb callback that will be called once the ids are ready. + * @param cls closure given to @a cb + */ +struct RPS_SamplerRequestHandleSingleInfo * +RPS_sampler_get_rand_peer_info (struct RPS_Sampler *sampler, + RPS_sampler_sinlge_info_ready_cb cb, + void *cls) +{ + struct RPS_SamplerRequestHandleSingleInfo *req_handle; + struct GetPeerCls *gpc; + + GNUNET_assert (0 != sampler->sampler_size); + + // TODO check if we have too much (distinct) sampled peers + req_handle = GNUNET_new (struct RPS_SamplerRequestHandleSingleInfo); + req_handle->id = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity)); + req_handle->sampler = sampler; + req_handle->callback = cb; + req_handle->cls = cls; + GNUNET_CONTAINER_DLL_insert (sampler->req_handle_single_head, + sampler->req_handle_single_tail, + req_handle); + + gpc = GNUNET_new (struct GetPeerCls); + gpc->req_handle = NULL; + gpc->req_single_info_handle = req_handle; + gpc->cont = check_peer_info_ready; + gpc->cont_cls = req_handle; + gpc->id = req_handle->id; + + GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head, + req_handle->gpc_tail, + gpc); + // maybe add a little delay + gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, + gpc); + return req_handle; +} + + /** * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. * @@ -558,6 +698,45 @@ RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle) } +/** + * Cancle a request issued through #RPS_sampler_sinlge_info_ready_cb. + * + * @param req_handle the handle to the request + */ +void +RPS_sampler_request_single_info_cancel ( + struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle) +{ + struct GetPeerCls *i; + + while (NULL != (i = req_single_info_handle->gpc_head) ) + { + GNUNET_CONTAINER_DLL_remove (req_single_info_handle->gpc_head, + req_single_info_handle->gpc_tail, + i); + if (NULL != i->get_peer_task) + { + GNUNET_SCHEDULER_cancel (i->get_peer_task); + } + if (NULL != i->notify_ctx) + { + GNUNET_CONTAINER_DLL_remove (req_single_info_handle->sampler->notify_ctx_head, + req_single_info_handle->sampler->notify_ctx_tail, + i->notify_ctx); + GNUNET_free (i->notify_ctx); + i->notify_ctx = NULL; + } + GNUNET_free (i); + } + GNUNET_free (req_single_info_handle->id); + req_single_info_handle->id = NULL; + GNUNET_CONTAINER_DLL_remove (req_single_info_handle->sampler->req_handle_single_head, + req_single_info_handle->sampler->req_handle_single_tail, + req_single_info_handle); + GNUNET_free (req_single_info_handle); +} + + /** * Cleans the sampler. */ diff --git a/src/rps/rps-sampler_common.h b/src/rps/rps-sampler_common.h index 1abe43720..321efaf1e 100644 --- a/src/rps/rps-sampler_common.h +++ b/src/rps/rps-sampler_common.h @@ -44,10 +44,14 @@ * * @param cls the closure given alongside this function. * @param id the PeerID that was returned + * @param probability The probability with which this sampler has seen all ids + * @param num_observed How many ids this sampler has observed */ typedef void (*RPS_sampler_rand_peer_ready_cont) (void *cls, - const struct GNUNET_PeerIdentity *id); + const struct GNUNET_PeerIdentity *id, + double probability, + uint32_t num_observed); /** @@ -71,6 +75,22 @@ typedef void void *cls); +/** + * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready. + * + * @param cls the closure given alongside this function. + * @param probability Probability with which all IDs have been observed + * @param num_observed Number of observed IDs + * @param ids the PeerIDs that were returned + * to be freed + */ + typedef void +(*RPS_sampler_sinlge_info_ready_cb) (const struct GNUNET_PeerIdentity *ids, + void *cls, + double probability, + uint32_t num_observed); + + /** * @brief Callback called each time a new peer was put into the sampler * @@ -96,6 +116,11 @@ struct GetPeerCls */ struct RPS_SamplerRequestHandle *req_handle; + /** + * The #RPS_SamplerRequestHandleSingleInfo this single request belongs to. + */ + struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle; + /** * The task for this function. */ @@ -177,6 +202,12 @@ struct RPS_Sampler struct RPS_SamplerRequestHandle *req_handle_head; struct RPS_SamplerRequestHandle *req_handle_tail; + /** + * Head and tail for the DLL to store the #RPS_SamplerRequestHandleSingleInfo + */ + struct RPS_SamplerRequestHandleSingleInfo *req_handle_single_head; + struct RPS_SamplerRequestHandleSingleInfo *req_handle_single_tail; + struct SamplerNotifyUpdateCTX *notify_ctx_head; struct SamplerNotifyUpdateCTX *notify_ctx_tail; }; @@ -305,6 +336,19 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, void *cls); +/** + * Get one random peer with additional information. + * + * @param sampler the sampler to get peers from. + * @param cb callback that will be called once the ids are ready. + * @param cls closure given to @a cb + */ +struct RPS_SamplerRequestHandleSingleInfo * +RPS_sampler_get_rand_peer_info (struct RPS_Sampler *sampler, + RPS_sampler_sinlge_info_ready_cb cb, + void *cls); + + /** * Counts how many Samplers currently hold a given PeerID. * @@ -327,6 +371,16 @@ void RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle); +/** + * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. + * + * @param req_handle the handle to the request + */ +void +RPS_sampler_request_single_info_cancel ( + struct RPS_SamplerRequestHandleSingleInfo *req_single_info_handle); + + /** * Cleans the sampler. */ diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index 7a3adfa94..83dff27e8 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -127,6 +127,16 @@ struct GNUNET_RPS_Handle */ struct GNUNET_RPS_Request_Handle *rh_tail; + /** + * @brief Pointer to the head element in DLL of single request handles + */ + struct GNUNET_RPS_Request_Handle_Single_Info *rhs_head; + + /** + * @brief Pointer to the tail element in DLL of single request handles + */ + struct GNUNET_RPS_Request_Handle_Single_Info *rhs_tail; + /** * @brief The desired probability with which we want to have observed all * peers. @@ -196,6 +206,54 @@ struct GNUNET_RPS_Request_Handle }; +/** + * Handler for a single request from a client. + */ +struct GNUNET_RPS_Request_Handle_Single_Info +{ + /** + * The client issuing the request. + */ + struct GNUNET_RPS_Handle *rps_handle; + + /** + * @brief The Sampler for the client request + */ + struct RPS_Sampler *sampler; + + /** + * @brief Request handle of the request to the sampler - needed to cancel the request + */ + struct RPS_SamplerRequestHandleSingleInfo *sampler_rh; + + /** + * @brief Request handle of the request of the biased stream of peers - + * needed to cancel the request + */ + struct GNUNET_RPS_StreamRequestHandle *srh; + + /** + * The callback to be called when we receive an answer. + */ + GNUNET_RPS_NotifyReadySingleInfoCB ready_cb; + + /** + * The closure for the callback. + */ + void *ready_cb_cls; + + /** + * @brief Pointer to next element in DLL + */ + struct GNUNET_RPS_Request_Handle_Single_Info *next; + + /** + * @brief Pointer to previous element in DLL + */ + struct GNUNET_RPS_Request_Handle_Single_Info *prev; +}; + + /** * Struct used to pack the callback, its closure (provided by the caller) * and the connection handler to the service to pass it to a callback function. @@ -308,6 +366,36 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers, } +/** + * @brief Called once the sampler has collected the requested peer. + * + * Calls the callback provided by the client with the corresponding cls. + * + * @param peers The array of @a num_peers that has been returned. + * @param num_peers The number of peers that have been returned + * @param cls The #GNUNET_RPS_Request_Handle + * @param probability Probability with which all IDs have been observed + * @param num_observed Number of observed IDs + */ +static void +peer_info_ready_cb (const struct GNUNET_PeerIdentity *peers, + void *cls, + double probability, + uint32_t num_observed) +{ + struct GNUNET_RPS_Request_Handle *rh = cls; + (void) probability; + (void) num_observed; + uint32_t num_peers = 1; + + rh->sampler_rh = NULL; + rh->ready_cb (rh->ready_cb_cls, + num_peers, + peers); + GNUNET_RPS_request_cancel (rh); +} + + /** * @brief Callback to collect the peers from the biased stream and put those * into the sampler. @@ -632,15 +720,15 @@ mq_error_handler (void *cls, */ static void hash_from_share_val (const char *share_val, - struct GNUNET_HashCode *hash) + struct GNUNET_HashCode *hash) { GNUNET_CRYPTO_kdf (hash, - sizeof (struct GNUNET_HashCode), - "rps", - strlen ("rps"), - share_val, - strlen (share_val), - NULL, 0); + sizeof (struct GNUNET_HashCode), + "rps", + strlen ("rps"), + share_val, + strlen (share_val), + NULL, 0); } @@ -672,6 +760,13 @@ nse_cb (void *cls, RPS_sampler_update_with_nw_size (rh_iter->sampler, GNUNET_NSE_log_estimate_to_n (logestimate)); } + for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head; + NULL != rhs_iter && NULL != rhs_iter->next; + rhs_iter = rhs_iter->next) + { + RPS_sampler_update_with_nw_size (rhs_iter->sampler, + GNUNET_NSE_log_estimate_to_n (logestimate)); + } } @@ -856,6 +951,48 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, } +/** + * Request one random peer, getting additional information. + * + * @param rps_handle handle to the rps service + * @param ready_cb the callback called when the peers are available + * @param cls closure given to the callback + * @return a handle to cancel this request + */ +struct GNUNET_RPS_Request_Handle_Single_Info * +GNUNET_RPS_request_peer_info (struct GNUNET_RPS_Handle *rps_handle, + GNUNET_RPS_NotifyReadySingleInfoCB ready_cb, + void *cls) +{ + struct GNUNET_RPS_Request_Handle_Single_Info *rhs; + uint32_t num_req_peers = 1; + + LOG (GNUNET_ERROR_TYPE_INFO, + "Client requested peer with additional info\n"); + rhs = GNUNET_new (struct GNUNET_RPS_Request_Handle_Single_Info); + rhs->rps_handle = rps_handle; + rhs->sampler = RPS_sampler_mod_init (num_req_peers, + GNUNET_TIME_UNIT_SECONDS); // TODO remove this time-stuff + RPS_sampler_set_desired_probability (rhs->sampler, + rps_handle->desired_probability); + RPS_sampler_set_deficiency_factor (rhs->sampler, + rps_handle->deficiency_factor); + rhs->sampler_rh = RPS_sampler_get_rand_peer_info (rhs->sampler, + peer_info_ready_cb, + rhs); + rhs->srh = GNUNET_RPS_stream_request (rps_handle, + collect_peers_cb, + rhs); /* cls */ + rhs->ready_cb = ready_cb; + rhs->ready_cb_cls = cls; + GNUNET_CONTAINER_DLL_insert (rps_handle->rhs_head, + rps_handle->rhs_tail, + rhs); + + return rhs; +} + + /** * Seed rps service with peerIDs. * @@ -1046,6 +1183,37 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) } +/** + * Cancle an issued single info request. + * + * @param rhs request handle of request to cancle + */ +void +GNUNET_RPS_request_single_info_cancel ( + struct GNUNET_RPS_Request_Handle_Single_Info *rhs) +{ + struct GNUNET_RPS_Handle *h; + + h = rhs->rps_handle; + GNUNET_assert (NULL != rhs); + GNUNET_assert (NULL != rhs->srh); + GNUNET_assert (h == rhs->srh->rps_handle); + GNUNET_RPS_stream_cancel (rhs->srh); + rhs->srh = NULL; + if (NULL == h->stream_requests_head) cancel_stream(h); + if (NULL != rhs->sampler_rh) + { + RPS_sampler_request_single_info_cancel (rhs->sampler_rh); + } + RPS_sampler_destroy (rhs->sampler); + rhs->sampler = NULL; + GNUNET_CONTAINER_DLL_remove (h->rhs_head, + h->rhs_tail, + rhs); + GNUNET_free (rhs); +} + + /** * Disconnect from the rps service * @@ -1079,6 +1247,17 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h) GNUNET_RPS_request_cancel (rh_iter); } } + if (NULL != h->rhs_head) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Not all requests were cancelled!\n"); + for (struct GNUNET_RPS_Request_Handle_Single_Info *rhs_iter = h->rhs_head; + h->rhs_head != NULL; + rhs_iter = h->rhs_head) + { + GNUNET_RPS_request_single_info_cancel (rhs_iter); + } + } if (NULL != srh_callback_peers) { GNUNET_free (srh_callback_peers); -- 2.25.1