X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Frps%2Ftest_rps.c;h=8c0be24fbcb5757ea0f3c2fbf886baecc641e036;hb=56c021a3748c30b93e4ac3a03b6dc685400960d4;hp=01777bd9079041b7e39e865dd2b5992a9967b9a9;hpb=2b70084dc824be72ecc4cc95e52490b5d0ec4d91;p=oweals%2Fgnunet.git diff --git a/src/rps/test_rps.c b/src/rps/test_rps.c index 01777bd90..8c0be24fb 100644 --- a/src/rps/test_rps.c +++ b/src/rps/test_rps.c @@ -46,6 +46,7 @@ uint32_t num_peers; //#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) static struct GNUNET_TIME_Relative timeout; + /** * Portion of malicious peers */ @@ -105,6 +106,52 @@ static struct OpListEntry *oplist_head; static struct OpListEntry *oplist_tail; +/** + * A pending reply: A request was sent and the reply is pending. + */ +struct PendingReply +{ + /** + * DLL next,prev ptr + */ + struct PendingReply *next; + struct PendingReply *prev; + + /** + * Handle to the request we are waiting for + */ + struct GNUNET_RPS_Request_Handle *req_handle; + + /** + * The peer that requested + */ + struct RPSPeer *rps_peer; +}; + + +/** + * A pending request: A request was not made yet but is scheduled for later. + */ +struct PendingRequest +{ + /** + * DLL next,prev ptr + */ + struct PendingRequest *next; + struct PendingRequest *prev; + + /** + * Handle to the request we are waiting for + */ + struct GNUNET_SCHEDULER_Task *request_task; + + /** + * The peer that requested + */ + struct RPSPeer *rps_peer; +}; + + /** * Information we track for each peer. */ @@ -141,14 +188,36 @@ struct RPSPeer int online; /** - * Received PeerIDs + * Number of Peer IDs to request + */ + unsigned int num_ids_to_request; + + /** + * Pending requests DLL + */ + struct PendingRequest *pending_req_head; + struct PendingRequest *pending_req_tail; + + /** + * Number of pending requests + */ + unsigned int num_pending_reqs; + + /** + * Pending replies DLL */ - struct GNUNET_PeerIdentity *rec_ids; + struct PendingReply *pending_rep_head; + struct PendingReply *pending_rep_tail; + + /** + * Number of pending replies + */ + unsigned int num_pending_reps; /** * Number of received PeerIDs */ - unsigned int num_rec_ids; + unsigned int num_recv_ids; }; @@ -167,6 +236,16 @@ static struct GNUNET_CONTAINER_MultiPeerMap *peer_map; */ static struct GNUNET_PeerIdentity *rps_peer_ids; +/** + * ID of the targeted peer. + */ +static struct GNUNET_PeerIdentity *target_peer; + +/** + * ID of the peer that requests for the evaluation. + */ +static struct RPSPeer *eval_peer; + /** * Number of online peers. */ @@ -183,6 +262,16 @@ static int ok; */ static struct GNUNET_SCHEDULER_Task *churn_task; +/** + * Identifier for the churn task that runs periodically + */ +static struct GNUNET_SCHEDULER_Task *shutdown_task; + + +/** + * Called to initialise the given RPSPeer + */ +typedef void (*InitPeer) (struct RPSPeer *rps_peer); /** * Called directly after connecting to the service @@ -223,6 +312,11 @@ struct SingleTestRun */ char *name; + /** + * Called to initialise peer + */ + InitPeer init_peer; + /** * Called directly after connecting to the service */ @@ -359,25 +453,23 @@ ids_to_file (char *file_name, * Test the success of a single test */ static int -evaluate (struct RPSPeer *loc_rps_peers, - unsigned int num_loc_rps_peers, - unsigned int expected_recv) +evaluate (void) { unsigned int i; int tmp_ok; - tmp_ok = (1 == loc_rps_peers[0].num_rec_ids); + tmp_ok = 1; - for (i = 0 ; i < num_loc_rps_peers ; i++) + for (i = 0; i < num_peers; i++) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u. peer [%s] received %u of %u expected peer_ids: %i\n", - i, - GNUNET_i2s (loc_rps_peers[i].peer_id), - loc_rps_peers[i].num_rec_ids, - expected_recv, - (1 == loc_rps_peers[i].num_rec_ids)); - tmp_ok &= (1 == loc_rps_peers[i].num_rec_ids); + "%u. peer [%s] received %u of %u expected peer_ids: %i\n", + i, + GNUNET_i2s (rps_peers[i].peer_id), + rps_peers[i].num_recv_ids, + rps_peers[i].num_ids_to_request, + (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids)); + tmp_ok &= (rps_peers[i].num_ids_to_request == rps_peers[i].num_recv_ids); } return tmp_ok? 0 : 1; } @@ -397,85 +489,11 @@ make_oplist_entry () } -/** - * Callback to be called when RPS service is started or stopped at peers - * - * @param cls NULL - * @param op the operation handle - * @param emsg NULL on success; otherwise an error description - */ -static void -churn_cb (void *cls, - struct GNUNET_TESTBED_Operation *op, - const char *emsg) -{ - // FIXME - struct OpListEntry *entry = cls; - - GNUNET_TESTBED_operation_done (entry->op); - if (NULL != emsg) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n"); - GNUNET_SCHEDULER_shutdown (); - return; - } - GNUNET_assert (0 != entry->delta); - - num_peers_online += entry->delta; - - if (0 > entry->delta) - { /* Peer hopefully just went offline */ - if (GNUNET_YES != rps_peers[entry->index].online) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "peer %s was expected to go offline but is still marked as online\n", - GNUNET_i2s (rps_peers[entry->index].peer_id)); - GNUNET_break (0); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "peer %s probably went offline as expected\n", - GNUNET_i2s (rps_peers[entry->index].peer_id)); - } - rps_peers[entry->index].online = GNUNET_NO; - } - - else if (0 < entry->delta) - { /* Peer hopefully just went online */ - if (GNUNET_NO != rps_peers[entry->index].online) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "peer %s was expected to go online but is still marked as offline\n", - GNUNET_i2s (rps_peers[entry->index].peer_id)); - GNUNET_break (0); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "peer %s probably went online as expected\n", - GNUNET_i2s (rps_peers[entry->index].peer_id)); - if (NULL != cur_test_run.pre_test) - { - cur_test_run.pre_test (&rps_peers[entry->index], - rps_peers[entry->index].rps_handle); - } - } - rps_peers[entry->index].online = GNUNET_YES; - } - - GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry); - GNUNET_free (entry); - //if (num_peers_in_round[current_round] == peers_running) - // run_round (); -} - - /** * Task run on timeout to shut everything down. */ static void -shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_op (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { unsigned int i; @@ -483,8 +501,9 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) if (NULL != churn_task) GNUNET_SCHEDULER_cancel (churn_task); - for (i = 0 ; i < num_peers ; i++) - GNUNET_TESTBED_operation_done (rps_peers[i].op); + for (i = 0; i < num_peers; i++) + if (NULL != rps_peers[i].op) + GNUNET_TESTBED_operation_done (rps_peers[i].op); GNUNET_SCHEDULER_shutdown (); } @@ -511,6 +530,39 @@ seed_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_RPS_seed_ids (peer->rps_handle, amount, rps_peer_ids); } +/** + * Seed peers. + */ + void +seed_peers_big (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct RPSPeer *peer = (struct RPSPeer *) cls; + unsigned int seed_msg_size; + uint32_t num_peers_max; + unsigned int amount; + unsigned int i; + + seed_msg_size = 8; /* sizeof (struct GNUNET_RPS_CS_SeedMessage) */ + num_peers_max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - seed_msg_size) / + sizeof (struct GNUNET_PeerIdentity); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peers that fit in one seed msg; %u\n", + num_peers_max); + amount = num_peers_max + (0.5 * num_peers_max); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Seeding many (%u) peers:\n", + amount); + struct GNUNET_PeerIdentity ids_to_seed[amount]; + for (i = 0; i < amount; i++) + { + ids_to_seed[i] = *peer->peer_id; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Seeding %u. peer: %s\n", + i, + GNUNET_i2s (&ids_to_seed[i])); + } + + GNUNET_RPS_seed_ids (peer->rps_handle, amount, ids_to_seed); +} /** * Get the id of peer i. @@ -526,6 +578,7 @@ info_cb (void *cb_cls, if (NULL == pinfo || NULL != emsg) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Got Error: %s\n", emsg); + GNUNET_TESTBED_operation_done (entry->op); return; } @@ -536,22 +589,18 @@ info_cb (void *cb_cls, rps_peer_ids[entry->index] = *(pinfo->result.id); rps_peers[entry->index].peer_id = &rps_peer_ids[entry->index]; - rps_peers[entry->index].rec_ids = NULL; - rps_peers[entry->index].num_rec_ids = 0; GNUNET_CONTAINER_multipeermap_put (peer_map, &rps_peer_ids[entry->index], &rps_peers[entry->index], GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - tofile ("/tmp/rps/peer_ids", "%u\t%s\n", entry->index, GNUNET_i2s_full (&rps_peer_ids[entry->index])); - GNUNET_TESTBED_operation_done (entry->op); - GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry); + GNUNET_TESTBED_operation_done (entry->op); GNUNET_free (entry); } @@ -629,10 +678,13 @@ rps_connect_adapter (void *cls, */ static void rps_disconnect_adapter (void *cls, - void *op_result) + void *op_result) { + struct RPSPeer *peer = cls; struct GNUNET_RPS_Handle *h = op_result; + GNUNET_assert (NULL != peer); GNUNET_RPS_disconnect (h); + peer->rps_handle = NULL; } @@ -644,13 +696,21 @@ rps_disconnect_adapter (void *cls, static int default_eval_cb (void) { - return evaluate (rps_peers, num_peers, 1); + return evaluate (); } static int no_eval (void) { - return 1; + return 0; +} + +/** + * Initialise given RPSPeer + */ +static void default_init_peer (struct RPSPeer *rps_peer) +{ + rps_peer->num_ids_to_request = 1; } /** @@ -665,22 +725,34 @@ default_reply_handle (void *cls, uint64_t n, const struct GNUNET_PeerIdentity *recv_peers) { - struct RPSPeer *rps_peer = (struct RPSPeer *) cls; + struct RPSPeer *rps_peer; + struct PendingReply *pending_rep = (struct PendingReply *) cls; unsigned int i; + rps_peer = pending_rep->rps_peer; + GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head, + rps_peer->pending_rep_tail, + pending_rep); + rps_peer->num_pending_reps--; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "[%s] got %" PRIu64 " peers:\n", GNUNET_i2s (rps_peer->peer_id), n); - for (i = 0 ; i < n ; i++) + for (i = 0; i < n; i++) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u: %s\n", i, GNUNET_i2s (&recv_peers[i])); - GNUNET_array_append (rps_peer->rec_ids, rps_peer->num_rec_ids, recv_peers[i]); + rps_peer->num_recv_ids++; + } + + if (0 == evaluate ()) + { + GNUNET_SCHEDULER_cancel (shutdown_task); + shutdown_task = GNUNET_SCHEDULER_add_now (&shutdown_op, NULL); } } @@ -691,24 +763,141 @@ static void request_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct RPSPeer *rps_peer = (struct RPSPeer *) cls; + struct RPSPeer *rps_peer; + struct PendingRequest *pending_req = (struct PendingRequest *) cls; + struct PendingReply *pending_rep; if (GNUNET_YES == in_shutdown) return; + rps_peer = pending_req->rps_peer; + GNUNET_assert (1 <= rps_peer->num_pending_reqs); + GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head, + rps_peer->pending_req_tail, + pending_req); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Requesting one peer\n"); + pending_rep = GNUNET_new (struct PendingReply); + pending_rep->rps_peer = rps_peer; + pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle, + 1, + cur_test_run.reply_handle, + pending_rep); + GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head, + rps_peer->pending_rep_tail, + pending_rep); + rps_peer->num_pending_reps++; + rps_peer->num_pending_reqs--; +} + +static void +cancel_pending_req (struct PendingRequest *pending_req) +{ + struct RPSPeer *rps_peer; + + rps_peer = pending_req->rps_peer; + GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head, + rps_peer->pending_req_tail, + pending_req); + rps_peer->num_pending_reqs--; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cancelling pending request\n"); + GNUNET_SCHEDULER_cancel (pending_req->request_task); + GNUNET_free (pending_req); +} + +static void +cancel_request (struct PendingReply *pending_rep) +{ + struct RPSPeer *rps_peer; + + rps_peer = pending_rep->rps_peer; + GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head, + rps_peer->pending_rep_tail, + pending_rep); + rps_peer->num_pending_reps--; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cancelling request\n"); + GNUNET_RPS_request_cancel (pending_rep->req_handle); + GNUNET_free (pending_rep); +} + +/** + * Cancel a request. + */ +static void +cancel_request_cb (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PendingReply *pending_rep; + struct RPSPeer *rps_peer = (struct RPSPeer *) cls; + + if (GNUNET_YES == in_shutdown) + return; + pending_rep = rps_peer->pending_rep_head; + GNUNET_assert (1 <= rps_peer->num_pending_reps); + cancel_request (pending_rep); +} - GNUNET_free (GNUNET_RPS_request_peers (rps_peer->rps_handle, - 1, - cur_test_run.reply_handle, - rps_peer)); - //rps_peer->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle, 1, handle_reply, rps_peer); + +/** + * Schedule requests for peer @a rps_peer that have neither been scheduled, nor + * issued, nor replied + */ +void +schedule_missing_requests (struct RPSPeer *rps_peer) +{ + unsigned int i; + struct PendingRequest *pending_req; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Scheduling %u - %u missing requests\n", + rps_peer->num_ids_to_request, + rps_peer->num_pending_reqs + rps_peer->num_pending_reps); + GNUNET_assert (rps_peer->num_pending_reqs + rps_peer->num_pending_reps <= + rps_peer->num_ids_to_request); + for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps; + i < rps_peer->num_ids_to_request; i++) + { + pending_req = GNUNET_new (struct PendingRequest); + pending_req->rps_peer = rps_peer; + pending_req->request_task = GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + cur_test_run.request_interval * i), + request_peers, + pending_req); + GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head, + rps_peer->pending_req_tail, + pending_req); + rps_peer->num_pending_reqs++; + } } +void +cancel_pending_req_rep (struct RPSPeer *rps_peer) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cancelling all (pending) requests.\n"); + while (NULL != rps_peer->pending_req_head) + cancel_pending_req (rps_peer->pending_req_head); + GNUNET_assert (0 == rps_peer->num_pending_reqs); + while (NULL != rps_peer->pending_rep_head) + cancel_request (rps_peer->pending_rep_head); + GNUNET_assert (0 == rps_peer->num_pending_reps); +} /*********************************** * MALICIOUS ***********************************/ + +/** + * Initialise only non-mal RPSPeers + */ +static void mal_init_peer (struct RPSPeer *rps_peer) +{ + if (rps_peer->index >= round (portion * num_peers)) + rps_peer->num_ids_to_request = 1; +} + static void mal_pre (void *cls, struct GNUNET_RPS_Handle *h) { @@ -716,8 +905,8 @@ mal_pre (void *cls, struct GNUNET_RPS_Handle *h) uint32_t num_mal_peers; struct RPSPeer *rps_peer = (struct RPSPeer *) cls; - GNUNET_assert (1 >= portion - && 0 < portion); + GNUNET_assert ( (1 >= portion) && + (0 < portion) ); num_mal_peers = round (portion * num_peers); if (rps_peer->index < num_mal_peers) @@ -728,7 +917,8 @@ mal_pre (void *cls, struct GNUNET_RPS_Handle *h) GNUNET_i2s (rps_peer->peer_id), num_mal_peers); - GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers, rps_peer_ids); + GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers, + rps_peer_ids, target_peer); } #endif /* ENABLE_MALICIOUS */ } @@ -739,8 +929,8 @@ mal_cb (struct RPSPeer *rps_peer) uint32_t num_mal_peers; #ifdef ENABLE_MALICIOUS - GNUNET_assert (1 >= portion - && 0 < portion); + GNUNET_assert ( (1 >= portion) && + (0 < portion) ); num_mal_peers = round (portion * num_peers); if (rps_peer->index >= num_mal_peers) @@ -748,23 +938,11 @@ mal_cb (struct RPSPeer *rps_peer) it's not sampling */ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2), seed_peers, rps_peer); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), - request_peers, rps_peer); + schedule_missing_requests (rps_peer); } #endif /* ENABLE_MALICIOUS */ } -static int -mal_eval (void) -{ - unsigned int num_mal_peers; - - num_mal_peers = round (num_peers * portion); - return evaluate (&rps_peers[num_mal_peers], - num_peers - (num_mal_peers), - 1); -} - /*********************************** * SINGLE_REQUEST @@ -772,8 +950,7 @@ mal_eval (void) static void single_req_cb (struct RPSPeer *rps_peer) { - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), - request_peers, rps_peer); + schedule_missing_requests (rps_peer); } /*********************************** @@ -782,10 +959,7 @@ single_req_cb (struct RPSPeer *rps_peer) static void delay_req_cb (struct RPSPeer *rps_peer) { - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), - request_peers, rps_peer); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), - request_peers, rps_peer); + schedule_missing_requests (rps_peer); } /*********************************** @@ -794,8 +968,9 @@ delay_req_cb (struct RPSPeer *rps_peer) static void seed_cb (struct RPSPeer *rps_peer) { - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), - seed_peers, rps_peer); + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), + seed_peers, rps_peer); } /*********************************** @@ -805,6 +980,9 @@ static void seed_big_cb (struct RPSPeer *rps_peer) { // TODO test seeding > GNUNET_SERVER_MAX_MESSAGE_SIZE peers + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2), + seed_peers_big, rps_peer); } /*********************************** @@ -822,10 +1000,10 @@ single_peer_seed_cb (struct RPSPeer *rps_peer) static void seed_req_cb (struct RPSPeer *rps_peer) { - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2), - seed_peers, rps_peer); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15), - request_peers, rps_peer); + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2), + seed_peers, rps_peer); + schedule_missing_requests (rps_peer); } //TODO start big mal @@ -836,16 +1014,130 @@ seed_req_cb (struct RPSPeer *rps_peer) static void req_cancel_cb (struct RPSPeer *rps_peer) { - // TODO + schedule_missing_requests (rps_peer); + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + (cur_test_run.request_interval + 1)), + cancel_request_cb, rps_peer); } /*********************************** * PROFILER ***********************************/ + +/** + * Callback to be called when RPS service is started or stopped at peers + * + * @param cls NULL + * @param op the operation handle + * @param emsg NULL on success; otherwise an error description + */ static void -churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +churn_cb (void *cls, + struct GNUNET_TESTBED_Operation *op, + const char *emsg) +{ + // FIXME + struct OpListEntry *entry = cls; + + GNUNET_TESTBED_operation_done (entry->op); + if (NULL != emsg) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n"); + GNUNET_SCHEDULER_shutdown (); + return; + } + GNUNET_assert (0 != entry->delta); + + num_peers_online += entry->delta; + + if (0 > entry->delta) + { /* Peer hopefully just went offline */ + if (GNUNET_YES != rps_peers[entry->index].online) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "peer %s was expected to go offline but is still marked as online\n", + GNUNET_i2s (rps_peers[entry->index].peer_id)); + GNUNET_break (0); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "peer %s probably went offline as expected\n", + GNUNET_i2s (rps_peers[entry->index].peer_id)); + } + rps_peers[entry->index].online = GNUNET_NO; + } + + else if (0 < entry->delta) + { /* Peer hopefully just went online */ + if (GNUNET_NO != rps_peers[entry->index].online) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "peer %s was expected to go online but is still marked as offline\n", + GNUNET_i2s (rps_peers[entry->index].peer_id)); + GNUNET_break (0); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "peer %s probably went online as expected\n", + GNUNET_i2s (rps_peers[entry->index].peer_id)); + if (NULL != cur_test_run.pre_test) + { + cur_test_run.pre_test (&rps_peers[entry->index], + rps_peers[entry->index].rps_handle); + schedule_missing_requests (&rps_peers[entry->index]); + } + } + rps_peers[entry->index].online = GNUNET_YES; + } + + GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry); + GNUNET_free (entry); + //if (num_peers_in_round[current_round] == peers_running) + // run_round (); +} + +static void +manage_service_wrapper (unsigned int i, unsigned int j, int delta, + double prob_go_on_off) { struct OpListEntry *entry; + uint32_t prob; + + prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT32_MAX); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%u. selected peer (%u: %s) is %s.\n", + i, + j, + GNUNET_i2s (rps_peers[j].peer_id), + (0 > delta) ? "online" : "offline"); + if (prob < prob_go_on_off * UINT32_MAX) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s goes %s\n", + GNUNET_i2s (rps_peers[j].peer_id), + (0 > delta) ? "offline" : "online"); + + if (0 > delta) + cancel_pending_req_rep (&rps_peers[j]); + entry = make_oplist_entry (); + entry->delta = delta; + entry->index = j; + entry->op = GNUNET_TESTBED_peer_manage_service (NULL, + testbed_peers[j], + "rps", + &churn_cb, + entry, + (0 > delta) ? 0 : 1); + } +} + +static void +churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ unsigned int i; unsigned int j; double portion_online; @@ -853,7 +1145,6 @@ churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) double prob_go_offline; double portion_go_online; double portion_go_offline; - uint32_t prob; /* Compute the probability for an online peer to go offline * this round */ @@ -878,65 +1169,21 @@ churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) (unsigned int) num_peers); /* Go over 50% randomly chosen peers */ - for (i = 0 ; i < .5 * num_peers ; i++) + for (i = 0; i < .5 * num_peers; i++) { j = permut[i]; /* If online, shut down with certain probability */ if (GNUNET_YES == rps_peers[j].online) { - prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT32_MAX); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u. selected peer (%u: %s) is online.\n", - i, - j, - GNUNET_i2s (rps_peers[j].peer_id)); - if (prob < prob_go_offline * UINT32_MAX) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s goes offline\n", - GNUNET_i2s (rps_peers[j].peer_id)); - - entry = make_oplist_entry (); - entry->delta = -1; - entry->index = j; - entry->op = GNUNET_TESTBED_peer_manage_service (NULL, - testbed_peers[j], - "rps", - &churn_cb, - entry, - 0); - } - } - - /* If offline, restart with certain probability */ - else if (GNUNET_NO == rps_peers[j].online) - { - prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT32_MAX); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u. selected peer (%u: %s) is offline.\n", - i, - j, - GNUNET_i2s (rps_peers[j].peer_id)); - if (prob < .66 * UINT32_MAX) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s goes online\n", - GNUNET_i2s (rps_peers[j].peer_id)); - - entry = make_oplist_entry (); - entry->delta = 1; - entry->index = j; - entry->op = GNUNET_TESTBED_peer_manage_service (NULL, - testbed_peers[j], - "rps", - &churn_cb, - entry, - 1); - } - } + manage_service_wrapper (i, j, -1, prob_go_offline); + } + + /* If offline, restart with certain probability */ + else if (GNUNET_NO == rps_peers[j].online) + { + manage_service_wrapper (i, j, 1, 0.66); + } } GNUNET_free (permut); @@ -948,21 +1195,13 @@ churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } -static void -profiler_pre (void *cls, struct GNUNET_RPS_Handle *h) +/** + * Initialise given RPSPeer + */ +static void profiler_init_peer (struct RPSPeer *rps_peer) { - //churn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - // 10), - // churn, NULL); - mal_pre (cls, h); - - /* if (NULL == churn_task) - { - churn_task = GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), - churn, - NULL); - } */ + if (num_peers - 1 == rps_peer->index) + rps_peer->num_ids_to_request = cur_test_run.num_requests; } @@ -978,52 +1217,46 @@ profiler_reply_handle (void *cls, uint64_t n, const struct GNUNET_PeerIdentity *recv_peers) { - struct RPSPeer *rps_peer = (struct RPSPeer *) cls; + struct RPSPeer *rps_peer; struct RPSPeer *rcv_rps_peer; char *file_name; char *file_name_dh; unsigned int i; + struct PendingReply *pending_rep = (struct PendingReply *) cls; + rps_peer = pending_rep->rps_peer; file_name = "/tmp/rps/received_ids"; file_name_dh = "/tmp/rps/diehard_input"; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "[%s] got %" PRIu64 " peers:\n", GNUNET_i2s (rps_peer->peer_id), n); - - for (i = 0 ; i < n ; i++) + for (i = 0; i < n; i++) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u: %s\n", i, GNUNET_i2s (&recv_peers[i])); - - /* GNUNET_array_append (rps_peer->rec_ids, rps_peer->num_rec_ids, recv_peers[i]); */ tofile (file_name, "%s\n", GNUNET_i2s_full (&recv_peers[i])); - rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]); - tofile (file_name_dh, "%" PRIu32 "\n", (uint32_t) rcv_rps_peer->index); } + default_reply_handle (cls, n, recv_peers); } static void profiler_cb (struct RPSPeer *rps_peer) { - uint32_t i; - - /* Churn only at peers that do not request peers for evaluation */ - if (NULL == churn_task && - rps_peer->index != num_peers - 2) + /* Start churn */ + if (NULL == churn_task) { churn_task = GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), churn, NULL); } @@ -1031,17 +1264,8 @@ profiler_cb (struct RPSPeer *rps_peer) /* Only request peer ids at one peer. * (It's the before-last because last one is target of the focussed attack.) */ - if (rps_peer->index == num_peers - 2) - { - for (i = 0 ; i < cur_test_run.num_requests ; i++) - { - GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - cur_test_run.request_interval * i), - request_peers, - rps_peer); - } - } + if (eval_peer == rps_peer) + schedule_missing_requests (rps_peer); } /** @@ -1092,7 +1316,7 @@ profiler_eval (void) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Scan of directory failed\n"); } - return 0; + return evaluate (); } @@ -1123,45 +1347,46 @@ run (void *cls, { unsigned int i; struct OpListEntry *entry; + uint32_t num_mal_peers; testbed_peers = peers; num_peers_online = 0; - - for (i = 0 ; i < num_peers ; i++) + for (i = 0; i < num_peers; i++) { entry = make_oplist_entry (); entry->index = i; + rps_peers[i].index = i; + if (NULL != cur_test_run.init_peer) + cur_test_run.init_peer (&rps_peers[i]); entry->op = GNUNET_TESTBED_peer_get_information (peers[i], GNUNET_TESTBED_PIT_IDENTITY, &info_cb, entry); } - - // This seems not to work - //if (NULL != strstr (cur_test_run.name, "profiler")) - //{ - // churn_task = GNUNET_SCHEDULER_add_delayed ( - // GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), - // churn, - // NULL); - //} - + num_mal_peers = round (portion * num_peers); GNUNET_assert (num_peers == n_peers); - for (i = 0 ; i < n_peers ; i++) + for (i = 0; i < n_peers; i++) { rps_peers[i].index = i; - rps_peers[i].op = - GNUNET_TESTBED_service_connect (&rps_peers[i], - peers[i], - "rps", - &rps_connect_complete_cb, - &rps_peers[i], - &rps_connect_adapter, - &rps_disconnect_adapter, - &rps_peers[i]); + if ( (rps_peers[i].num_recv_ids < rps_peers[i].num_ids_to_request) || + (i < num_mal_peers) ) + { + rps_peers[i].op = + GNUNET_TESTBED_service_connect (&rps_peers[i], + peers[i], + "rps", + &rps_connect_complete_cb, + &rps_peers[i], + &rps_connect_adapter, + &rps_disconnect_adapter, + &rps_peers[i]); + } } - GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_task, NULL); + + if (NULL != churn_task) + GNUNET_SCHEDULER_cancel (churn_task); + shutdown_task = GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_op, NULL); } @@ -1177,20 +1402,20 @@ main (int argc, char *argv[]) { int ret_value; + num_peers = 5; cur_test_run.name = "test-rps-default"; + cur_test_run.init_peer = default_init_peer; cur_test_run.pre_test = NULL; cur_test_run.reply_handle = default_reply_handle; cur_test_run.eval_cb = default_eval_cb; churn_task = NULL; - - num_peers = 5; timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30); if (strstr (argv[0], "malicious") != NULL) { cur_test_run.pre_test = mal_pre; cur_test_run.main_test = mal_cb; - cur_test_run.eval_cb = mal_eval; + cur_test_run.init_peer = mal_init_peer; if (strstr (argv[0], "_1") != NULL) { @@ -1229,8 +1454,11 @@ main (int argc, char *argv[]) else if (strstr (argv[0], "_seed_big") != NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test seeding (num_peers > GNUNET_SERVER_MAX_MESSAGE_SIZE)\n"); + num_peers = 1; cur_test_run.name = "test-rps-seed-big"; cur_test_run.main_test = seed_big_cb; + cur_test_run.eval_cb = no_eval; + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10); } else if (strstr (argv[0], "_single_peer_seed") != NULL) @@ -1259,31 +1487,40 @@ main (int argc, char *argv[]) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n"); cur_test_run.name = "test-rps-req-cancel"; + num_peers = 1; cur_test_run.main_test = req_cancel_cb; + cur_test_run.eval_cb = no_eval; + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10); } else if (strstr (argv[0], "profiler") != NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n"); cur_test_run.name = "test-rps-profiler"; + num_peers = 10; mal_type = 3; - cur_test_run.pre_test = profiler_pre; + cur_test_run.init_peer = profiler_init_peer; + cur_test_run.pre_test = mal_pre; cur_test_run.main_test = profiler_cb; cur_test_run.reply_handle = profiler_reply_handle; cur_test_run.eval_cb = profiler_eval; cur_test_run.request_interval = 2; - cur_test_run.num_requests = 50; - - num_peers = 50; + cur_test_run.num_requests = 5; + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90); + /* 'Clean' directory */ (void) GNUNET_DISK_directory_remove ("/tmp/rps/"); GNUNET_DISK_directory_create ("/tmp/rps/"); - timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90); } rps_peers = GNUNET_new_array (num_peers, struct RPSPeer); - rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO); + rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); + if ( (2 == mal_type) || + (3 == mal_type)) + target_peer = &rps_peer_ids[num_peers - 2]; + if (profiler_eval == cur_test_run.eval_cb) + eval_peer = &rps_peers[num_peers - 1]; ok = 1; (void) GNUNET_TESTBED_test_run (cur_test_run.name, @@ -1293,11 +1530,9 @@ main (int argc, char *argv[]) &run, NULL); ret_value = cur_test_run.eval_cb(); - GNUNET_free (rps_peers ); GNUNET_free (rps_peer_ids); GNUNET_CONTAINER_multipeermap_destroy (peer_map); - return ret_value; }