*/
static uint64_t num_view_updates;
-/**
- * @brief Number of peers we want to receive from stream
- */
-static uint64_t num_stream_peers;
-
/**
* Task run when user presses CTRL-C to abort.
if (0 == num_peers)
{
- FPRINTF (stdout, "Empty view\n");
+ FPRINTF (stdout, "No peer was returned\n");
}
req_handle = NULL;
for (i = 0; i < num_peers; i++)
{
FPRINTF (stdout, "%s\n",
GNUNET_i2s_full (&recv_peers[i]));
-
- if (1 == num_stream_peers)
- {
- ret = 0;
- GNUNET_SCHEDULER_shutdown ();
- break;
- }
- else if (1 < num_stream_peers)
- {
- num_stream_peers--;
- }
}
}
} else if (stream_input)
{
/* Get updates of view */
- if (NULL == args[0] ||
- 0 == sscanf (args[0], "%lu", &num_stream_peers))
- {
- num_stream_peers = 0;
- }
- GNUNET_RPS_stream_request (rps_handle, num_stream_peers, stream_input_handle, NULL);
- if (0 != num_stream_peers)
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Requesting %" PRIu64 " peers from biased stream\n", num_stream_peers);
- else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Requesting continuous peers from biased stream\n");
+ GNUNET_RPS_stream_request (rps_handle, stream_input_handle, NULL);
GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
}
else
*/
struct GNUNET_RPS_Handle *rps_handle;
- /**
- * The number of requested peers.
- */
- uint32_t num_peers_left;
-
/**
* The callback to be called when we receive an answer.
*/
*/
static struct GNUNET_RPS_StreamRequestHandle *
new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
- uint64_t num_peers,
GNUNET_RPS_NotifyReadyCB ready_cb,
void *cls)
{
srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
srh->rps_handle = rps_handle;
- srh->num_peers_left = num_peers;
srh->ready_cb = ready_cb;
srh->ready_cb_cls = cls;
GNUNET_CONTAINER_DLL_insert (rps_handle->stream_requests_head,
* Request biased stream of peers that are being put into the sampler
*
* @param rps_handle handle to the rps service
- * @param num_req_peers number of peers we want to receive
- * (0 for infinite updates)
* @param cls a closure that will be given to the callback
* @param ready_cb the callback called when the peers are available
*/
struct GNUNET_RPS_StreamRequestHandle *
GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
- uint32_t num_peers,
GNUNET_RPS_NotifyReadyCB stream_input_cb,
void *cls)
{
struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
srh = new_stream_request (rps_handle,
- num_peers, /* num requests */
stream_input_cb,
cls);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Client requests %" PRIu32 " biased stream updates\n",
- num_peers);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requests biased stream updates\n");
ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
GNUNET_MQ_send (rps_handle->mq, ev);
{
struct GNUNET_RPS_Handle *h = cls;
const struct GNUNET_PeerIdentity *peers;
- /* The following two pointers are used to prevent that new handles are
- * inserted into the DLL, that is currently iterated over, from within a call
- * to that handler_cb, are executed and in turn again add themselves to the
- * iterated DLL infinitely */
- struct GNUNET_RPS_StreamRequestHandle *srh_head_tmp;
- struct GNUNET_RPS_StreamRequestHandle *srh_tail_tmp;
uint64_t num_peers;
- uint64_t num_peers_return;
peers = (struct GNUNET_PeerIdentity *) &msg[1];
num_peers = ntohl (msg->num_peers);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received %" PRIu64 " peer(s) from stream input.\n",
num_peers);
- srh_head_tmp = h->stream_requests_head;
- srh_tail_tmp = h->stream_requests_tail;
- h->stream_requests_head = NULL;
- h->stream_requests_tail = NULL;
- for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
+ for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = h->stream_requests_head;
NULL != srh_iter;
srh_iter = srh_iter->next)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Calling srh - left: %" PRIu64 "\n",
- srh_iter->num_peers_left);
- if (0 == srh_iter->num_peers_left) /* infinite updates */
- {
- num_peers_return = num_peers;
- }
- else if (num_peers > srh_iter->num_peers_left)
- {
- num_peers_return = num_peers - srh_iter->num_peers_left;
- }
- else /* num_peers <= srh_iter->num_peers_left */
- {
- num_peers_return = srh_iter->num_peers_left - num_peers;
- }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
srh_iter->ready_cb (srh_iter->ready_cb_cls,
- num_peers_return,
+ num_peers,
peers);
- if (0 == srh_iter->num_peers_left) ;
- else if (num_peers_return >= srh_iter->num_peers_left)
- {
- remove_stream_request (srh_iter,
- srh_head_tmp,
- srh_tail_tmp);
- }
- else
- {
- srh_iter->num_peers_left -= num_peers_return;
- }
- }
- for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = srh_head_tmp;
- NULL != srh_iter;
- srh_iter = srh_iter->next)
- {
- GNUNET_CONTAINER_DLL_remove (srh_head_tmp,
- srh_tail_tmp,
- srh_iter);
- GNUNET_CONTAINER_DLL_insert (h->stream_requests_head,
- h->stream_requests_tail,
- srh_iter);
}
if (NULL == h->stream_requests_head)
peers_ready_cb,
rh);
rh->srh = GNUNET_RPS_stream_request (rps_handle,
- 0, /* infinite updates */
collect_peers_cb,
rh); /* cls */
rh->ready_cb = ready_cb;
struct GNUNET_RPS_Handle *h;
h = rh->rps_handle;
- if (NULL != rh->srh)
- {
- remove_stream_request (rh->srh,
- h->stream_requests_head,
- h->stream_requests_tail);
- }
+ GNUNET_assert (NULL != rh->srh);
+ remove_stream_request (rh->srh,
+ h->stream_requests_head,
+ h->stream_requests_tail);
if (NULL == h->stream_requests_head) cancel_stream(h);
if (NULL != rh->sampler_rh)
{