/*
This file is part of GNUnet.
- Copyright (C) 2009, 2012 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2009, 2012 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
/**
* How many peers do we start?
*/
-uint32_t num_peers;
+static uint32_t num_peers;
/**
* How long do we run the test?
*/
unsigned int num_pending_reps;
- /**
- * Received PeerIDs
- */
- struct GNUNET_PeerIdentity *rec_ids;
-
/**
* Number of received PeerIDs
*/
- unsigned int num_rec_ids;
+ unsigned int num_recv_ids;
};
*/
static int ok;
-
/**
* Identifier for the churn task that runs periodically
*/
static struct GNUNET_SCHEDULER_Task *churn_task;
-/**
- * Identifier for the churn task that runs periodically
- */
-static struct GNUNET_SCHEDULER_Task *shutdown_task;
-
-
/**
* Called to initialise the given RPSPeer
*/
* Append arguments to file
*/
static void
-tofile_ (const char *file_name, char *line)
+tofile_ (const char *file_name, const char *line)
{
struct GNUNET_DISK_FileHandle *f;
/* char output_buffer[512]; */
if (size != size2)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Unable to write to file! (Size: %u, size2: %u)\n",
+ "Unable to write to file! (Size: %lu, size2: %lu)\n",
size,
size2);
+ if (GNUNET_YES != GNUNET_DISK_file_close (f))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Unable to close file\n");
+ }
return;
}
if (GNUNET_YES != GNUNET_DISK_file_close (f))
+ {
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Unable to close file\n");
+ }
}
/**
/**
- * Write the ids and their according index in the given array to a file
+ * Write the ids and their according index in the given array to a file
* Unused
*/
/* static void
"%u. peer [%s] received %u of %u expected peer_ids: %i\n",
i,
GNUNET_i2s (rps_peers[i].peer_id),
- rps_peers[i].num_rec_ids,
+ rps_peers[i].num_recv_ids,
rps_peers[i].num_ids_to_request,
- (rps_peers[i].num_ids_to_request == rps_peers[i].num_rec_ids));
- tmp_ok &= (rps_peers[i].num_ids_to_request == rps_peers[i].num_rec_ids);
+ (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids));
+ tmp_ok &= (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids);
}
return tmp_ok? 0 : 1;
}
* Task run on timeout to shut everything down.
*/
static void
-shutdown_op (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_op (void *cls)
{
unsigned int i;
in_shutdown = GNUNET_YES;
if (NULL != churn_task)
+ {
GNUNET_SCHEDULER_cancel (churn_task);
-
- for (i = 0 ; i < num_peers ; i++)
- GNUNET_TESTBED_operation_done (rps_peers[i].op);
+ churn_task = NULL;
+ }
+ for (i = 0; i < num_peers; i++)
+ if (NULL != rps_peers[i].op)
+ GNUNET_TESTBED_operation_done (rps_peers[i].op);
GNUNET_SCHEDULER_shutdown ();
}
/**
* Seed peers.
*/
- void
-seed_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static void
+seed_peers (void *cls)
{
+ struct RPSPeer *peer = cls;
unsigned int amount;
- struct RPSPeer *peer = (struct RPSPeer *) cls;
unsigned int i;
// TODO if malicious don't seed mal peers
}
+/**
+ * Seed peers.
+ */
+static void
+seed_peers_big (void *cls)
+{
+ struct RPSPeer *peer = cls;
+ unsigned int seed_msg_size;
+ uint32_t num_peers_max;
+ unsigned int amount;
+ unsigned int i;
+
+ seed_msg_size = 8; /* sizeof (struct GNUNET_RPS_CS_SeedMessage) */
+ num_peers_max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - seed_msg_size) /
+ sizeof (struct GNUNET_PeerIdentity);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peers that fit in one seed msg; %u\n",
+ num_peers_max);
+ amount = num_peers_max + (0.5 * num_peers_max);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Seeding many (%u) peers:\n",
+ amount);
+ struct GNUNET_PeerIdentity ids_to_seed[amount];
+ for (i = 0; i < amount; i++)
+ {
+ ids_to_seed[i] = *peer->peer_id;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n",
+ i,
+ GNUNET_i2s (&ids_to_seed[i]));
+ }
+
+ GNUNET_RPS_seed_ids (peer->rps_handle, amount, ids_to_seed);
+}
+
/**
* Get the id of peer i.
*/
if (NULL == pinfo || NULL != emsg)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Got Error: %s\n", emsg);
+ GNUNET_TESTBED_operation_done (entry->op);
return;
}
rps_peer_ids[entry->index] = *(pinfo->result.id);
rps_peers[entry->index].peer_id = &rps_peer_ids[entry->index];
- rps_peers[entry->index].rec_ids = NULL;
- rps_peers[entry->index].num_rec_ids = 0;
- GNUNET_CONTAINER_multipeermap_put (peer_map,
- &rps_peer_ids[entry->index],
- &rps_peers[entry->index],
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_put (peer_map,
+ &rps_peer_ids[entry->index],
+ &rps_peers[entry->index],
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
tofile ("/tmp/rps/peer_ids",
"%u\t%s\n",
entry->index,
GNUNET_i2s_full (&rps_peer_ids[entry->index]));
- if (NULL != cur_test_run.init_peer)
- cur_test_run.init_peer (&rps_peers[entry->index]);
-
- GNUNET_TESTBED_operation_done (entry->op);
GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry);
+ GNUNET_TESTBED_operation_done (entry->op);
GNUNET_free (entry);
}
rps_disconnect_adapter (void *cls,
void *op_result)
{
+ struct RPSPeer *peer = cls;
struct GNUNET_RPS_Handle *h = op_result;
+ GNUNET_assert (NULL != peer);
GNUNET_RPS_disconnect (h);
+ peer->rps_handle = NULL;
}
"[%s] got %" PRIu64 " peers:\n",
GNUNET_i2s (rps_peer->peer_id),
n);
-
+
for (i = 0; i < n; i++)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
i,
GNUNET_i2s (&recv_peers[i]));
- GNUNET_array_append (rps_peer->rec_ids, rps_peer->num_rec_ids, recv_peers[i]);
+ rps_peer->num_recv_ids++;
}
if (0 == evaluate ())
{
- GNUNET_SCHEDULER_cancel (shutdown_task);
- shutdown_task = GNUNET_SCHEDULER_add_now (&shutdown_op, NULL);
+ GNUNET_SCHEDULER_shutdown ();
}
}
* Request random peers.
*/
static void
-request_peers (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+request_peers (void *cls)
{
+ struct PendingRequest *pending_req = cls;
struct RPSPeer *rps_peer;
- struct PendingRequest *pending_req = (struct PendingRequest *) cls;
struct PendingReply *pending_rep;
if (GNUNET_YES == in_shutdown)
* Cancel a request.
*/
static void
-cancel_request_cb (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+cancel_request_cb (void *cls)
{
+ struct RPSPeer *rps_peer = cls;
struct PendingReply *pending_rep;
- struct RPSPeer *rps_peer = (struct RPSPeer *) cls;
if (GNUNET_YES == in_shutdown)
return;
unsigned int i;
struct PendingRequest *pending_req;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Scheduling %u - %u missing requests\n",
+ rps_peer->num_ids_to_request,
+ rps_peer->num_pending_reqs + rps_peer->num_pending_reps);
+ GNUNET_assert (rps_peer->num_pending_reqs + rps_peer->num_pending_reps <=
+ rps_peer->num_ids_to_request);
for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps;
i < rps_peer->num_ids_to_request; i++)
{
void
cancel_pending_req_rep (struct RPSPeer *rps_peer)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cancelling all (pending) requests.\n");
while (NULL != rps_peer->pending_req_head)
cancel_pending_req (rps_peer->pending_req_head);
GNUNET_assert (0 == rps_peer->num_pending_reqs);
static void
seed_cb (struct RPSPeer *rps_peer)
{
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
- seed_peers, rps_peer);
+ GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10),
+ seed_peers, rps_peer);
}
/***********************************
seed_big_cb (struct RPSPeer *rps_peer)
{
// TODO test seeding > GNUNET_SERVER_MAX_MESSAGE_SIZE peers
+ GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
+ seed_peers_big, rps_peer);
}
/***********************************
static void
seed_req_cb (struct RPSPeer *rps_peer)
{
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
- seed_peers, rps_peer);
+ GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2),
+ seed_peers, rps_peer);
schedule_missing_requests (rps_peer);
}
i,
j,
GNUNET_i2s (rps_peers[j].peer_id),
- (delta < 0)? "online" : "offline");
+ (0 > delta) ? "online" : "offline");
if (prob < prob_go_on_off * UINT32_MAX)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%s goes %s\n",
GNUNET_i2s (rps_peers[j].peer_id),
- (delta < 0) ? "offline" : "online");
+ (0 > delta) ? "offline" : "online");
+ if (0 > delta)
+ cancel_pending_req_rep (&rps_peers[j]);
entry = make_oplist_entry ();
entry->delta = delta;
entry->index = j;
"rps",
&churn_cb,
entry,
- (delta < 0) ? 0 : 1);
+ (0 > delta) ? 0 : 1);
}
}
+
static void
-churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+churn (void *cls)
{
unsigned int i;
unsigned int j;
/* If online, shut down with certain probability */
if (GNUNET_YES == rps_peers[j].online)
{
- cancel_pending_req_rep (&rps_peers[j]);
manage_service_wrapper (i, j, -1, prob_go_offline);
}
"%s\n",
GNUNET_i2s_full (&recv_peers[i]));
rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]);
+ GNUNET_assert (NULL != rcv_rps_peer);
tofile (file_name_dh,
"%" PRIu32 "\n",
(uint32_t) rcv_rps_peer->index);
}
- /* Find #PendingReply holding the request handle */
- GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head,
- rps_peer->pending_rep_tail,
- pending_rep);
- rps_peer->num_pending_reps--;
+ default_reply_handle (cls, n, recv_peers);
}
{
RPS_sampler_elem_next (s_elem, &rps_peer_ids[i]);
}
+ RPS_sampler_elem_destroy (s_elem);
}
return GNUNET_OK;
}
{
unsigned int i;
struct OpListEntry *entry;
+ uint32_t num_mal_peers;
testbed_peers = peers;
num_peers_online = 0;
- for (i = 0 ; i < num_peers ; i++)
+ for (i = 0; i < num_peers; i++)
{
entry = make_oplist_entry ();
entry->index = i;
+ rps_peers[i].index = i;
+ if (NULL != cur_test_run.init_peer)
+ cur_test_run.init_peer (&rps_peers[i]);
entry->op = GNUNET_TESTBED_peer_get_information (peers[i],
GNUNET_TESTBED_PIT_IDENTITY,
&info_cb,
entry);
}
+ num_mal_peers = round (portion * num_peers);
GNUNET_assert (num_peers == n_peers);
- for (i = 0 ; i < n_peers ; i++)
+ for (i = 0; i < n_peers; i++)
{
rps_peers[i].index = i;
- rps_peers[i].op =
- GNUNET_TESTBED_service_connect (&rps_peers[i],
- peers[i],
- "rps",
- &rps_connect_complete_cb,
- &rps_peers[i],
- &rps_connect_adapter,
- &rps_disconnect_adapter,
- &rps_peers[i]);
+ if ( (rps_peers[i].num_recv_ids < rps_peers[i].num_ids_to_request) ||
+ (i < num_mal_peers) )
+ {
+ rps_peers[i].op =
+ GNUNET_TESTBED_service_connect (&rps_peers[i],
+ peers[i],
+ "rps",
+ &rps_connect_complete_cb,
+ &rps_peers[i],
+ &rps_connect_adapter,
+ &rps_disconnect_adapter,
+ &rps_peers[i]);
+ }
}
if (NULL != churn_task)
GNUNET_SCHEDULER_cancel (churn_task);
- shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL);
+ GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL);
}
else if (strstr (argv[0], "_seed_big") != NULL)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding (num_peers > GNUNET_SERVER_MAX_MESSAGE_SIZE)\n");
+ num_peers = 1;
cur_test_run.name = "test-rps-seed-big";
cur_test_run.main_test = seed_big_cb;
+ cur_test_run.eval_cb = no_eval;
+ timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10);
}
else if (strstr (argv[0], "_single_peer_seed") != NULL)
(3 == mal_type))
target_peer = &rps_peer_ids[num_peers - 2];
if (profiler_eval == cur_test_run.eval_cb)
- eval_peer = &rps_peers[num_peers - 1];
+ eval_peer = &rps_peers[num_peers - 1]; /* FIXME: eval_peer could be a
+ malicious peer if not careful
+ with the malicious portion */
ok = 1;
(void) GNUNET_TESTBED_test_run (cur_test_run.name,