From 6048d2a10a95822d06b5d7be640adc89a895b01a Mon Sep 17 00:00:00 2001 From: =?utf8?q?Julius=20B=C3=BCnger?= Date: Sun, 14 Oct 2018 13:35:23 +0200 Subject: [PATCH] RPS API: Remove numer of peers from stream request --- src/include/gnunet_rps_service.h | 3 -- src/rps/gnunet-rps.c | 31 +------------ src/rps/rps_api.c | 79 ++++---------------------------- src/rps/test_rps.c | 1 - 4 files changed, 10 insertions(+), 104 deletions(-) diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h index 7fdfe491e..274ca94a2 100644 --- a/src/include/gnunet_rps_service.h +++ b/src/include/gnunet_rps_service.h @@ -180,14 +180,11 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle *rps_handle, * Request biased stream of peers that are being put into the sampler * * @param rps_handle handle to the rps service - * @param num_req_peers number of peers we want to receive - * (0 for infinite updates) * @param cls a closure that will be given to the callback * @param ready_cb the callback called when the peers are available */ struct GNUNET_RPS_StreamRequestHandle * GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, - uint32_t num_updates, GNUNET_RPS_NotifyReadyCB stream_input_cb, void *cls); diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c index d0f905f51..49189481f 100644 --- a/src/rps/gnunet-rps.c +++ b/src/rps/gnunet-rps.c @@ -58,11 +58,6 @@ static int stream_input; */ static uint64_t num_view_updates; -/** - * @brief Number of peers we want to receive from stream - */ -static uint64_t num_stream_peers; - /** * Task run when user presses CTRL-C to abort. @@ -162,24 +157,13 @@ stream_input_handle (void *cls, if (0 == num_peers) { - FPRINTF (stdout, "Empty view\n"); + FPRINTF (stdout, "No peer was returned\n"); } req_handle = NULL; for (i = 0; i < num_peers; i++) { FPRINTF (stdout, "%s\n", GNUNET_i2s_full (&recv_peers[i])); - - if (1 == num_stream_peers) - { - ret = 0; - GNUNET_SCHEDULER_shutdown (); - break; - } - else if (1 < num_stream_peers) - { - num_stream_peers--; - } } } @@ -243,18 +227,7 @@ run (void *cls, } else if (stream_input) { /* Get updates of view */ - if (NULL == args[0] || - 0 == sscanf (args[0], "%lu", &num_stream_peers)) - { - num_stream_peers = 0; - } - GNUNET_RPS_stream_request (rps_handle, num_stream_peers, stream_input_handle, NULL); - if (0 != num_stream_peers) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Requesting %" PRIu64 " peers from biased stream\n", num_stream_peers); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Requesting continuous peers from biased stream\n"); + GNUNET_RPS_stream_request (rps_handle, stream_input_handle, NULL); GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); } else diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index 5c68e4337..02d833506 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -41,11 +41,6 @@ struct GNUNET_RPS_StreamRequestHandle */ struct GNUNET_RPS_Handle *rps_handle; - /** - * The number of requested peers. - */ - uint32_t num_peers_left; - /** * The callback to be called when we receive an answer. */ @@ -188,7 +183,6 @@ struct cb_cls_pack */ static struct GNUNET_RPS_StreamRequestHandle * new_stream_request (struct GNUNET_RPS_Handle *rps_handle, - uint64_t num_peers, GNUNET_RPS_NotifyReadyCB ready_cb, void *cls) { @@ -197,7 +191,6 @@ new_stream_request (struct GNUNET_RPS_Handle *rps_handle, srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle); srh->rps_handle = rps_handle; - srh->num_peers_left = num_peers; srh->ready_cb = ready_cb; srh->ready_cb_cls = cls; GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head, @@ -327,14 +320,11 @@ GNUNET_RPS_view_request_cancel (struct GNUNET_RPS_Handle *rps_handle) * Request biased stream of peers that are being put into the sampler * * @param rps_handle handle to the rps service - * @param num_req_peers number of peers we want to receive - * (0 for infinite updates) * @param cls a closure that will be given to the callback * @param ready_cb the callback called when the peers are available */ struct GNUNET_RPS_StreamRequestHandle * GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, - uint32_t num_peers, GNUNET_RPS_NotifyReadyCB stream_input_cb, void *cls) { @@ -343,12 +333,9 @@ GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle, struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg; srh = new_stream_request (rps_handle, - num_peers, /* num requests */ stream_input_cb, cls); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Client requests %" PRIu32 " biased stream updates\n", - num_peers); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n"); ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST); GNUNET_MQ_send (rps_handle->mq, ev); @@ -492,68 +479,21 @@ handle_stream_input (void *cls, { struct GNUNET_RPS_Handle *h = cls; const struct GNUNET_PeerIdentity *peers; - /* The following two pointers are used to prevent that new handles are - * inserted into the DLL, that is currently iterated over, from within a call - * to that handler_cb, are executed and in turn again add themselves to the - * iterated DLL infinitely */ - struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp; - struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp; uint64_t num_peers; - uint64_t num_peers_return; peers = (struct GNUNET_PeerIdentity *) &msg[1]; num_peers = ntohl (msg->num_peers); LOG (GNUNET_ERROR_TYPE_DEBUG, "Received %" PRIu64 " peer(s) from stream input.\n", num_peers); - srh_head_tmp = h->stream_requests_head; - srh_tail_tmp = h->stream_requests_tail; - h->stream_requests_head = NULL; - h->stream_requests_tail = NULL; - for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; + for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head; NULL != srh_iter; srh_iter = srh_iter->next) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Calling srh - left: %" PRIu64 "\n", - srh_iter->num_peers_left); - if (0 == srh_iter->num_peers_left) /* infinite updates */ - { - num_peers_return = num_peers; - } - else if (num_peers > srh_iter->num_peers_left) - { - num_peers_return = num_peers - srh_iter->num_peers_left; - } - else /* num_peers <= srh_iter->num_peers_left */ - { - num_peers_return = srh_iter->num_peers_left - num_peers; - } + LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n"); srh_iter->ready_cb (srh_iter->ready_cb_cls, - num_peers_return, + num_peers, peers); - if (0 == srh_iter->num_peers_left) ; - else if (num_peers_return >= srh_iter->num_peers_left) - { - remove_stream_request (srh_iter, - srh_head_tmp, - srh_tail_tmp); - } - else - { - srh_iter->num_peers_left -= num_peers_return; - } - } - for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp; - NULL != srh_iter; - srh_iter = srh_iter->next) - { - GNUNET_CONTAINER_DLL_remove (srh_head_tmp, - srh_tail_tmp, - srh_iter); - GNUNET_CONTAINER_DLL_insert (h->stream_requests_head, - h->stream_requests_tail, - srh_iter); } if (NULL == h->stream_requests_head) @@ -738,7 +678,6 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle *rps_handle, peers_ready_cb, rh); rh->srh = GNUNET_RPS_stream_request (rps_handle, - 0, /* infinite updates */ collect_peers_cb, rh); /* cls */ rh->ready_cb = ready_cb; @@ -913,12 +852,10 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) struct GNUNET_RPS_Handle *h; h = rh->rps_handle; - if (NULL != rh->srh) - { - remove_stream_request (rh->srh, - h->stream_requests_head, - h->stream_requests_tail); - } + GNUNET_assert (NULL != rh->srh); + remove_stream_request (rh->srh, + h->stream_requests_head, + h->stream_requests_tail); if (NULL == h->stream_requests_head) cancel_stream(h); if (NULL != rh->sampler_rh) { diff --git a/src/rps/test_rps.c b/src/rps/test_rps.c index 1c98a1e5e..0740d01df 100644 --- a/src/rps/test_rps.c +++ b/src/rps/test_rps.c @@ -1606,7 +1606,6 @@ sub_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h) if (0 != rps_peer->index) GNUNET_RPS_sub_start (h, "test"); else GNUNET_RPS_sub_start (h, "lonely"); /* have a group of one */ rps_peer->rps_srh = GNUNET_RPS_stream_request (h, - 0, &got_stream_peer_cb, rps_peer); } -- 2.25.1