static uint64_t num_view_updates;
/**
- * @brief Number of updates we want to receive
+ * @brief Number of peers we want to receive from stream
*/
static uint64_t num_stream_peers;
*/
static void
stream_input_handle (void *cls,
- const struct GNUNET_PeerIdentity *recv_peer)
+ uint64_t num_peers,
+ const struct GNUNET_PeerIdentity *recv_peers)
{
- // TODO when source of peer is sent, also print source
- FPRINTF (stdout, "%s\n",
- GNUNET_i2s_full (recv_peer));
+ uint64_t i;
+ (void) cls;
+
+ if (0 == num_peers)
+ {
+ FPRINTF (stdout, "Empty view\n");
+ }
+ req_handle = NULL;
+ for (i = 0; i < num_peers; i++)
+ {
+ FPRINTF (stdout, "%s\n",
+ GNUNET_i2s_full (&recv_peers[i]));
+
+ if (1 == num_stream_peers)
+ {
+ ret = 0;
+ GNUNET_SCHEDULER_shutdown ();
+ break;
+ }
+ else if (1 < num_stream_peers)
+ {
+ num_stream_peers--;
+ }
+ }
}
out_msg->num_peers = htonl (view_size);
GNUNET_memcpy (&out_msg[1],
- view_array,
- view_size * sizeof (struct GNUNET_PeerIdentity));
+ view_array,
+ view_size * sizeof (struct GNUNET_PeerIdentity));
GNUNET_MQ_send (cli_ctx->mq, ev);
}
/**
* @brief Send peer from biased stream to client.
*
+ * TODO merge with send_view, parameterise
+ *
* @param cli_ctx the context of the client
* @param view_array the peerids of the view as array (can be empty)
* @param view_size the size of the view array (can be 0)
*/
void
send_stream_peer (const struct ClientContext *cli_ctx,
- const struct GNUNET_PeerIdentity *peer)
+ uint64_t num_peers,
+ const struct GNUNET_PeerIdentity *peers)
{
struct GNUNET_MQ_Envelope *ev;
struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
- GNUNET_assert (NULL != peer);
+ GNUNET_assert (NULL != peers);
- ev = GNUNET_MQ_msg (out_msg,
- GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+ ev = GNUNET_MQ_msg_extra (out_msg,
+ num_peers * sizeof (struct GNUNET_PeerIdentity),
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+ out_msg->num_peers = htonl (num_peers);
- GNUNET_memcpy (&out_msg->peer,
- peer,
- sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_memcpy (&out_msg[1],
+ peers,
+ num_peers * sizeof (struct GNUNET_PeerIdentity));
GNUNET_MQ_send (cli_ctx->mq, ev);
}
* @brief sends updates to clients that are interested
*/
static void
-clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer)
- //enum StreamPeerSource)
+clients_notify_stream_peer (uint64_t num_peers,
+ const struct GNUNET_PeerIdentity *peers)
+ // TODO enum StreamPeerSource)
{
struct ClientContext *cli_ctx_iter;
+ uint64_t num_peers_send;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Got peer (%s) from biased stream - update all clients\n",
- GNUNET_i2s (peer));
+ GNUNET_i2s (peers));
- /* check size of view is small enough */
for (cli_ctx_iter = cli_ctx_head;
NULL != cli_ctx_iter;
cli_ctx_iter = cli_ctx_head->next)
{
- if (1 < cli_ctx_iter->stream_peers_left)
+ if (0 < cli_ctx_iter->stream_peers_left)
{
/* Client wants to receive limited amount of updates */
- cli_ctx_iter->stream_peers_left -= 1;
- } else if (1 == cli_ctx_iter->stream_peers_left)
+ if (num_peers > cli_ctx_iter->stream_peers_left)
+ {
+ num_peers_send = num_peers - cli_ctx_iter->stream_peers_left;
+ cli_ctx_iter->stream_peers_left = 0;
+ }
+ else
+ {
+ num_peers_send = cli_ctx_iter->stream_peers_left - num_peers;
+ cli_ctx_iter->stream_peers_left -= num_peers_send;
+ }
+ } else if (0 > cli_ctx_iter->stream_peers_left)
{
- /* Last update of view for client */
- cli_ctx_iter->stream_peers_left = -1;
- } else if (0 > cli_ctx_iter->stream_peers_left) {
/* Client is not interested in updates */
continue;
+ } else /* _updates_left == 0 - infinite amount of updates */
+ {
+ num_peers_send = num_peers;
}
- /* else _updates_left == 0 - infinite amount of updates */
/* send view */
- send_stream_peer (cli_ctx_iter, peer);
+ send_stream_peer (cli_ctx_iter, num_peers_send, peers);
}
}
inserted = insert_in_view (&ids[i]);
if (GNUNET_OK == inserted)
{
- clients_notify_stream_peer (&ids[i]);
+ clients_notify_stream_peer (1, &ids[i]);
}
to_file (file_name_view_log,
"+%s\t(hist)",
inserted = insert_in_view (peer);
if (GNUNET_OK == inserted)
{
- clients_notify_stream_peer (peer);
+ clients_notify_stream_peer (1, peer);
}
}
permut[i]));
if (GNUNET_OK == inserted)
{
- clients_notify_stream_peer (
+ clients_notify_stream_peer (1,
CustomPeerMap_get_peer_by_index (push_map, permut[i]));
}
to_file (file_name_view_log,
permut[i - first_border]));
if (GNUNET_OK == inserted)
{
- clients_notify_stream_peer (
+ clients_notify_stream_peer (1,
CustomPeerMap_get_peer_by_index (push_map, permut[i]));
}
to_file (file_name_view_log,
}
+/**
+ * This function is called, when the service sends another peer from the biased
+ * stream.
+ * It calls the callback the caller provided
+ * and disconnects afterwards.
+ *
+ * TODO merge with check_view_update
+ *
+ * @param msg the message
+ */
+static int
+check_stream_input (void *cls,
+ const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
+{
+ uint16_t msize = ntohs (msg->header.size);
+ uint32_t num_peers = ntohl (msg->num_peers);
+ (void) cls;
+
+ msize -= sizeof (struct GNUNET_RPS_CS_DEBUG_StreamReply);
+ if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
+ (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
/**
* This function is called, when the service sends another peer from the biased
* stream.
const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
{
struct GNUNET_RPS_Handle *h = cls;
+ const struct GNUNET_PeerIdentity *peers;
/* Give the peers back */
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "New peer of biased input stream\n");
+ "New peer of %" PRIu64 " biased input stream\n",
+ ntohl (msg->num_peers));
+ peers = (struct GNUNET_PeerIdentity *) &msg[1];
GNUNET_assert (NULL != h);
GNUNET_assert (NULL != h->stream_input_cb);
- h->stream_input_cb (h->stream_input_cb, &msg->peer);
+ h->stream_input_cb (h->stream_input_cb, ntohl (msg->num_peers), peers);
}
GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
struct GNUNET_RPS_CS_DEBUG_ViewReply,
h),
- GNUNET_MQ_hd_fixed_size (stream_input,
- GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
- struct GNUNET_RPS_CS_DEBUG_StreamReply,
- h),
+ GNUNET_MQ_hd_var_size (stream_input,
+ GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
+ struct GNUNET_RPS_CS_DEBUG_StreamReply,
+ h),
GNUNET_MQ_handler_end ()
};