*/
struct GNUNET_RPS_Handle *rps_handle;
+ /**
+ * Handle to stream requests
+ */
+ struct GNUNET_RPS_StreamRequestHandle *rps_srh;
+
/**
* ID of the peer.
*/
struct RPSPeer *peer = cls;
struct GNUNET_RPS_Handle *h = op_result;
+ if (NULL != peer->rps_srh)
+ {
+ GNUNET_RPS_stream_cancel (peer->rps_srh);
+ peer->rps_srh = NULL;
+ }
GNUNET_assert (NULL != peer);
GNUNET_RPS_disconnect (h);
peer->rps_handle = NULL;
* SUB
***********************************/
-void sub_post (struct RPSPeer *rps_peer)
+static void
+got_stream_peer_cb (void *cls,
+ uint64_t num_peers,
+ const struct GNUNET_PeerIdentity *peers)
{
- GNUNET_RPS_sub_stop (rps_peer->rps_handle, "test");
+ const struct RPSPeer *rps_peer = cls;
+
+ for (uint64_t i = 0; i < num_peers; i++)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer %" PRIu32 " received [%s] from stream.\n",
+ rps_peer->index,
+ GNUNET_i2s (&peers[i]));
+ if (0 != rps_peer->index &&
+ 0 == memcmp (&peers[i],
+ &rps_peers[0].peer_id,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Received a peer id outside sub\n");
+ ok = 1;
+ }
+ }
}
+
static void
-sub_stop_op (void *cls)
+sub_post (struct RPSPeer *rps_peer)
{
- struct GNUNET_RPS_Handle *h = cls;
-
- GNUNET_RPS_sub_stop (h, "test");
+ if (0 != rps_peer->index) GNUNET_RPS_sub_stop (rps_peer->rps_handle, "test");
}
static void
{
(void) rps_peer;
- GNUNET_RPS_sub_start (h, "test");
+ if (0 != rps_peer->index) GNUNET_RPS_sub_start (h, "test");
+ rps_peer->rps_srh = GNUNET_RPS_stream_request (h,
+ 0,
+ &got_stream_peer_cb,
+ rps_peer);
}
/***********************************
}
ret_value = cur_test_run.eval_cb();
-
+
if (NO_COLLECT_VIEW == cur_test_run.have_collect_view)
{
GNUNET_array_grow (rps_peers->cur_view,
return ret_value;
}
+
/* end of test_rps.c */