*/
static int view_update;
+/**
+ * @brief Do we want to receive updates of the view? (Option --view)
+ */
+static int stream_input;
+
/**
* @brief Number of updates we want to receive
*/
static uint64_t num_view_updates;
+/**
+ * @brief Number of updates we want to receive
+ */
+static uint64_t num_stream_peers;
+
/**
* Task run when user presses CTRL-C to abort.
}
+/**
+ * Callback called on receipt of peer from biased stream
+ *
+ * @param n number of peers
+ * @param recv_peers the received peers
+ */
+static void
+stream_input_handle (void *cls,
+ const struct GNUNET_PeerIdentity *recv_peer)
+{
+ // TODO when source of peer is sent, also print source
+ FPRINTF (stdout, "%s\n",
+ GNUNET_i2s_full (recv_peer));
+}
+
+
/**
* Main function that will be run by the scheduler.
*
}
if ((0 == memcmp (&zero_pid, &peer_id, sizeof (peer_id))) &&
- (!view_update))
+ (!view_update) &&
+ (!stream_input))
{ /* Request n PeerIDs */
/* If number was specified use it, else request single peer. */
if (NULL == args[0] ||
"Requesting %" PRIu64 " view updates\n", num_view_updates);
else
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Requesting contiuous view updates\n");
+ "Requesting continuous view updates\n");
+ GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
+ } else if (stream_input)
+ {
+ /* Get updates of view */
+ if (NULL == args[0] ||
+ 0 == sscanf (args[0], "%lu", &num_stream_peers))
+ {
+ num_stream_peers = 0;
+ }
+ GNUNET_RPS_stream_request (rps_handle, num_stream_peers, stream_input_handle, NULL);
+ if (0 != num_stream_peers)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Requesting %" PRIu64 " peers from biased stream\n", num_stream_peers);
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Requesting continuous peers from biased stream\n");
GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
}
else
"view",
gettext_noop ("Get updates of view (0 for infinite updates)"),
&view_update),
+ GNUNET_GETOPT_option_flag ('S',
+ "stream",
+ gettext_noop ("Get peers from biased stream"),
+ &stream_input),
GNUNET_GETOPT_OPTION_END
};
return (GNUNET_OK ==
*/
int64_t view_updates_left;
+ /**
+ * @brief How many peers from the biased
+ * stream this client expects to receive.
+ */
+ int64_t stream_peers_left;
+
/**
* The client handle to send the reply to
*/
return ret;
}
+
+/**
+ * @brief Send view to client
+ *
+ * @param cli_ctx the context of the client
+ * @param view_array the peerids of the view as array (can be empty)
+ * @param view_size the size of the view array (can be 0)
+ */
+void
+send_view (const struct ClientContext *cli_ctx,
+ const struct GNUNET_PeerIdentity *view_array,
+ uint64_t view_size)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
+
+ if (NULL == view_array)
+ {
+ view_size = View_size ();
+ view_array = View_get_as_array();
+ }
+
+ ev = GNUNET_MQ_msg_extra (out_msg,
+ view_size * sizeof (struct GNUNET_PeerIdentity),
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
+ out_msg->num_peers = htonl (view_size);
+
+ GNUNET_memcpy (&out_msg[1],
+ view_array,
+ view_size * sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_MQ_send (cli_ctx->mq, ev);
+}
+
+
+/**
+ * @brief Send peer from biased stream to client.
+ *
+ * @param cli_ctx the context of the client
+ * @param view_array the peerids of the view as array (can be empty)
+ * @param view_size the size of the view array (can be 0)
+ */
+void
+send_stream_peer (const struct ClientContext *cli_ctx,
+ const struct GNUNET_PeerIdentity *peer)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
+
+ GNUNET_assert (NULL != peer);
+
+ ev = GNUNET_MQ_msg (out_msg,
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+
+ GNUNET_memcpy (&out_msg->peer,
+ peer,
+ sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_MQ_send (cli_ctx->mq, ev);
+}
+
+
/**
* @brief sends updates to clients that are interested
*/
static void
-clients_notify_view_update (void);
+clients_notify_view_update (void)
+{
+ struct ClientContext *cli_ctx_iter;
+ uint64_t num_peers;
+ const struct GNUNET_PeerIdentity *view_array;
+
+ num_peers = View_size ();
+ view_array = View_get_as_array();
+ /* check size of view is small enough */
+ if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "View is too big to send\n");
+ return;
+ }
+
+ for (cli_ctx_iter = cli_ctx_head;
+ NULL != cli_ctx_iter;
+ cli_ctx_iter = cli_ctx_head->next)
+ {
+ if (1 < cli_ctx_iter->view_updates_left)
+ {
+ /* Client wants to receive limited amount of updates */
+ cli_ctx_iter->view_updates_left -= 1;
+ } else if (1 == cli_ctx_iter->view_updates_left)
+ {
+ /* Last update of view for client */
+ cli_ctx_iter->view_updates_left = -1;
+ } else if (0 > cli_ctx_iter->view_updates_left) {
+ /* Client is not interested in updates */
+ continue;
+ }
+ /* else _updates_left == 0 - infinite amount of updates */
+
+ /* send view */
+ send_view (cli_ctx_iter, view_array, num_peers);
+ }
+}
+
+
+/**
+ * @brief sends updates to clients that are interested
+ */
+static void
+clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer)
+ //enum StreamPeerSource)
+{
+ struct ClientContext *cli_ctx_iter;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Got peer (%s) from biased stream - update all clients\n",
+ GNUNET_i2s (peer));
+
+ /* check size of view is small enough */
+ for (cli_ctx_iter = cli_ctx_head;
+ NULL != cli_ctx_iter;
+ cli_ctx_iter = cli_ctx_head->next)
+ {
+ if (1 < cli_ctx_iter->stream_peers_left)
+ {
+ /* Client wants to receive limited amount of updates */
+ cli_ctx_iter->stream_peers_left -= 1;
+ } else if (1 == cli_ctx_iter->stream_peers_left)
+ {
+ /* Last update of view for client */
+ cli_ctx_iter->stream_peers_left = -1;
+ } else if (0 > cli_ctx_iter->stream_peers_left) {
+ /* Client is not interested in updates */
+ continue;
+ }
+ /* else _updates_left == 0 - infinite amount of updates */
+
+ /* send view */
+ send_stream_peer (cli_ctx_iter, peer);
+ }
+}
/**
* Put random peer from sampler into the view as history update.
for (i = 0; i < num_peers; i++)
{
- (void) insert_in_view (&ids[i]);
+ int inserted;
+ inserted = insert_in_view (&ids[i]);
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (&ids[i]);
+ }
to_file (file_name_view_log,
"+%s\t(hist)",
GNUNET_i2s_full (ids));
const struct GNUNET_PeerIdentity *peer)
{
(void) cls;
- (void) insert_in_view (peer);
+ int inserted;
+
+ inserted = insert_in_view (peer);
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (peer);
+ }
}
GNUNET_SERVICE_client_continue (cli_ctx->client);
}
-/**
- * @brief Send view to client
- *
- * @param cli_ctx the context of the client
- * @param view_array the peerids of the view as array (can be empty)
- * @param view_size the size of the view array (can be 0)
- */
-void
-send_view (const struct ClientContext *cli_ctx,
- const struct GNUNET_PeerIdentity *view_array,
- uint64_t view_size)
-{
- struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
-
- if (NULL == view_array)
- {
- view_size = View_size ();
- view_array = View_get_as_array();
- }
-
- ev = GNUNET_MQ_msg_extra (out_msg,
- view_size * sizeof (struct GNUNET_PeerIdentity),
- GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
- out_msg->num_peers = htonl (view_size);
-
- GNUNET_memcpy (&out_msg[1],
- view_array,
- view_size * sizeof (struct GNUNET_PeerIdentity));
- GNUNET_MQ_send (cli_ctx->mq, ev);
-}
/**
- * @brief sends updates to clients that are interested
+ * Handle RPS request from the client.
+ *
+ * @param cls closure
+ * @param message the actual message
*/
static void
-clients_notify_view_update (void)
+handle_client_view_request (void *cls,
+ const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
{
- struct ClientContext *cli_ctx_iter;
- uint64_t num_peers;
- const struct GNUNET_PeerIdentity *view_array;
+ struct ClientContext *cli_ctx = cls;
+ uint64_t num_updates;
- num_peers = View_size ();
- view_array = View_get_as_array();
- /* check size of view is small enough */
- if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "View is too big to send\n");
- return;
- }
+ num_updates = ntohl (msg->num_updates);
- for (cli_ctx_iter = cli_ctx_head;
- NULL != cli_ctx_iter;
- cli_ctx_iter = cli_ctx_head->next)
- {
- if (1 < cli_ctx_iter->view_updates_left)
- {
- /* Client wants to receive limited amount of updates */
- cli_ctx_iter->view_updates_left -= 1;
- } else if (1 == cli_ctx_iter->view_updates_left)
- {
- /* Last update of view for client */
- cli_ctx_iter->view_updates_left = -1;
- } else if (0 > cli_ctx_iter->view_updates_left) {
- /* Client is not interested in updates */
- continue;
- }
- /* else _updates_left == 0 - infinite amount of updates */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client requested %" PRIu64 " updates of view.\n",
+ num_updates);
- /* send view */
- send_view (cli_ctx_iter, view_array, num_peers);
- }
+ GNUNET_assert (NULL != cli_ctx);
+ cli_ctx->view_updates_left = num_updates;
+ send_view (cli_ctx, NULL, 0);
+ GNUNET_SERVICE_client_continue (cli_ctx->client);
}
/**
- * Handle RPS request from the client.
+ * Handle RPS request for biased stream from the client.
*
* @param cls closure
* @param message the actual message
*/
static void
-handle_client_view_request (void *cls,
- const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
+handle_client_stream_request (void *cls,
+ const struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg)
{
struct ClientContext *cli_ctx = cls;
- uint64_t num_updates;
+ uint64_t num_peers;
- num_updates = ntohl (msg->num_updates);
+ num_peers = ntohl (msg->num_peers);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Client requested %" PRIu64 " updates of view.\n",
- num_updates);
+ "Client requested %" PRIu64 " peers from biased stream.\n",
+ num_peers);
GNUNET_assert (NULL != cli_ctx);
- cli_ctx->view_updates_left = num_updates;
- send_view (cli_ctx, NULL, 0);
+ cli_ctx->stream_peers_left = num_peers;
GNUNET_SERVICE_client_continue (cli_ctx->client);
}
CustomPeerMap_size (push_map));
for (i = 0; i < first_border; i++)
{
- (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
- permut[i]));
+ int inserted;
+ inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
+ permut[i]));
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (
+ CustomPeerMap_get_peer_by_index (push_map, permut[i]));
+ }
to_file (file_name_view_log,
"+%s\t(push list)",
GNUNET_i2s_full (&view_array[i]));
CustomPeerMap_size (pull_map));
for (i = first_border; i < second_border; i++)
{
- (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
+ int inserted;
+ inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
permut[i - first_border]));
+ if (GNUNET_OK == inserted)
+ {
+ clients_notify_stream_peer (
+ CustomPeerMap_get_peer_by_index (push_map, permut[i]));
+ }
to_file (file_name_view_log,
"+%s\t(pull list)",
GNUNET_i2s_full (&view_array[i]));
GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
struct GNUNET_RPS_CS_DEBUG_ViewRequest,
NULL),
+ GNUNET_MQ_hd_fixed_size (client_stream_request,
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
+ struct GNUNET_RPS_CS_DEBUG_StreamRequest,
+ NULL),
GNUNET_MQ_handler_end());
/* end of gnunet-service-rps.c */
GNUNET_RPS_ViewUpdateCB view_update_cb;
/**
- * @brief Callback called on each update of the view
+ * @brief Closure to each requested update of the view
*/
void *view_update_cls;
+
+ /**
+ * @brief Callback called on each peer of the biased input stream
+ */
+ GNUNET_RPS_StreamInputCB stream_input_cb;
+
+ /**
+ * @brief Closure to each requested peer from the biased stream
+ */
+ void *stream_input_cls;
};
GNUNET_MQ_send (rps_handle->mq, ev);
}
+
+/**
+ * Request biased stream of peers that are being put into the sampler
+ *
+ * @param rps_handle handle to the rps service
+ * @param num_req_peers number of peers we want to receive
+ * (0 for infinite updates)
+ * @param cls a closure that will be given to the callback
+ * @param ready_cb the callback called when the peers are available
+ */
+void
+GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
+ uint32_t num_peers,
+ GNUNET_RPS_StreamInputCB stream_input_cb,
+ void *cls)
+{
+ struct GNUNET_MQ_Envelope *ev;
+ struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client requests %" PRIu32 " biased stream updates\n",
+ num_peers);
+ rps_handle->stream_input_cb = stream_input_cb;
+ rps_handle->stream_input_cls = cls;
+
+ ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
+ msg->num_peers = htonl (num_peers);
+ GNUNET_MQ_send (rps_handle->mq, ev);
+}
+
+
/**
* This function is called, when the service updates the view.
* It verifies that @a msg is well-formed.
return GNUNET_OK;
}
+
/**
* This function is called, when the service updated its view.
* It calls the callback the caller provided
}
+/**
+ * This function is called, when the service sends another peer from the biased
+ * stream.
+ * It calls the callback the caller provided
+ * and disconnects afterwards.
+ *
+ * @param msg the message
+ */
+static void
+handle_stream_input (void *cls,
+ const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
+{
+ struct GNUNET_RPS_Handle *h = cls;
+
+ /* Give the peers back */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "New peer of biased input stream\n");
+
+ GNUNET_assert (NULL != h);
+ GNUNET_assert (NULL != h->stream_input_cb);
+ h->stream_input_cb (h->stream_input_cb, &msg->peer);
+}
+
/**
* Reconnect to the service
GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
struct GNUNET_RPS_CS_DEBUG_ViewReply,
h),
+ GNUNET_MQ_hd_fixed_size (stream_input,
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
+ struct GNUNET_RPS_CS_DEBUG_StreamReply,
+ h),
GNUNET_MQ_handler_end ()
};