X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fdht%2Fgnunet_dht_profiler.c;h=9fa9f33c471168c205041b6ba98d9c3a3c8bfd6e;hb=5ac199dcc8af3a7052086087d36feda38bc4b4de;hp=6ed2075441da10084ffcb89432dbaf5128d8420a;hpb=2a79f975e0deacbd57ca97f22fb66d68cae1929a;p=oweals%2Fgnunet.git diff --git a/src/dht/gnunet_dht_profiler.c b/src/dht/gnunet_dht_profiler.c index 6ed207544..9fa9f33c4 100644 --- a/src/dht/gnunet_dht_profiler.c +++ b/src/dht/gnunet_dht_profiler.c @@ -38,7 +38,14 @@ /** * Number of peers which should perform a PUT out of 100 peers */ -#define PUT_PROBABILITY 50 +#define PUT_PROBABILITY 100 + +/** + * Percentage of peers that should act maliciously. + * These peers will never start PUT/GET request. + * n_active and n_malicious should not intersect. + */ +#define MALICIOUS_PEERS 0 /** * Configuration @@ -78,6 +85,30 @@ struct Context }; +#if ENABLE_MALICIOUS +/** + * Context for a peer which should act maliciously. + */ +struct MaliciousContext +{ + /** + * The linked peer context + */ + struct Context *ctx; + + /** + * Handler to the DHT service + */ + struct GNUNET_DHT_Handle *dht; +}; + +/** + * List of all the malicious peers contexts. + */ +struct Context **malicious_peer_contexts = NULL; + +#endif + /** * Context for a peer which actively does DHT PUT/GET */ @@ -160,6 +191,13 @@ static struct GNUNET_TIME_Relative timeout; */ static unsigned int num_peers; +#if ENABLE_MALICIOUS +/** + * Number or malicious peers. + */ +static unsigned int n_malicious; +#endif + /** * Number of active peers */ @@ -200,6 +238,126 @@ static unsigned int n_gets_ok; */ static unsigned int n_gets_fail; +/** + * Replication degree + */ +static unsigned int replication; + +/** + * Number of times we try to find the successor circle formation + */ +static unsigned int max_searches; + +/** + * Testbed Operation (to get stats). + */ +static struct GNUNET_TESTBED_Operation *bandwidth_stats_op; + +/** + * To get successor stats. + */ +static struct GNUNET_TESTBED_Operation *successor_stats_op; + +/** + * Testbed peer handles. + */ +static struct GNUNET_TESTBED_Peer **testbed_handles; + +/** + * Total number of messages sent by peer. + */ +static uint64_t outgoing_bandwidth; + +/** + * Total number of messages received by peer. + */ +static uint64_t incoming_bandwidth; + +/** + * Average number of hops taken to do put. + */ +static double average_put_path_length; + +/** + * Average number of hops taken to do get. + */ +static double average_get_path_length; + +/** + * Total put path length across all peers. + */ +static unsigned int total_put_path_length; + +/** + * Total get path length across all peers. + */ +static unsigned int total_get_path_length; + +/** + * Hashmap to store pair of peer and its corresponding successor. + */ +static struct GNUNET_CONTAINER_MultiHashMap *successor_peer_hashmap; + +/** + * Key to start the lookup on successor_peer_hashmap. + */ +static struct GNUNET_HashCode *start_key; + +/** + * Flag used to get the start_key. + */ +static int flag = 0; + +/** + * Task to collect peer and its current successor statistics. + */ +static GNUNET_SCHEDULER_TaskIdentifier successor_stats_task; + +/** + * Closure for successor_stats_task. + */ +struct Collect_Stat_Context +{ + /** + * Current Peer Context. + */ + struct Context *service_connect_ctx; + + /** + * Testbed operation acting on this peer + */ + struct GNUNET_TESTBED_Operation *op; +}; + +/** + * List of all the peers contexts. + */ +struct Context **peer_contexts = NULL; + +/** + * Counter to keep track of peers added to peer_context lists. + */ +static int peers_started = 0; + + +/** + * Should we do a PUT (mode = 0) or GET (mode = 1); + */ +static enum +{ + MODE_PUT = 0, + + MODE_GET = 1 +} mode; + +/** + * Task that collects successor statistics from all the peers. + * @param cls + * @param tc + */ +static void +collect_stats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + /** * Shutdown task. Cleanup all resources and operations. @@ -218,7 +376,7 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) for (cnt=0; cnt < num_peers; cnt++) { if (NULL != a_ctx[cnt].op) - GNUNET_TESTBED_operation_done (a_ctx[cnt].op); + GNUNET_TESTBED_operation_done (a_ctx[cnt].op); //FIXME: assertion fails. /* Cleanup active context if this peer is an active peer */ ac = a_ctx[cnt].ac; @@ -236,10 +394,64 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_free (a_ctx); a_ctx = NULL; } + if(NULL != bandwidth_stats_op) + GNUNET_TESTBED_operation_done (bandwidth_stats_op); + bandwidth_stats_op = NULL; GNUNET_free_non_null (a_ac); } +/** + * Stats callback. Finish the stats testbed operation and when all stats have + * been iterated, shutdown the test. + * + * @param cls closure + * @param op the operation that has been finished + * @param emsg error message in case the operation has failed; will be NULL if + * operation has executed successfully. + */ +static void +bandwidth_stats_cont (void *cls, + struct GNUNET_TESTBED_Operation *op, + const char *emsg) +{ + INFO ("# Outgoing bandwidth: %u\n", outgoing_bandwidth); + INFO ("# Incoming bandwidth: %u\n", incoming_bandwidth); + GNUNET_SCHEDULER_shutdown (); +} + + +/** + * Process statistic values. + * + * @param cls closure + * @param peer the peer the statistic belong to + * @param subsystem name of subsystem that created the statistic + * @param name the name of the datum + * @param value the current value + * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not + * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration + */ +static int +bandwidth_stats_iterator (void *cls, + const struct GNUNET_TESTBED_Peer *peer, + const char *subsystem, + const char *name, + uint64_t value, + int is_persistent) +{ + static const char *s_sent = "# Bytes transmitted to other peers"; + static const char *s_recv = "# Bytes received from other peers"; + + if (0 == strncmp (s_sent, name, strlen (s_sent))) + outgoing_bandwidth = outgoing_bandwidth + value; + else if (0 == strncmp(s_recv, name, strlen (s_recv))) + incoming_bandwidth = incoming_bandwidth + value; + + return GNUNET_OK; +} + + static void summarize () { @@ -249,7 +461,19 @@ summarize () INFO ("# GETS made: %u\n", n_gets); INFO ("# GETS succeeded: %u\n", n_gets_ok); INFO ("# GETS failed: %u\n", n_gets_fail); - GNUNET_SCHEDULER_shutdown (); + INFO ("# average_put_path_length: %f\n", average_put_path_length); + INFO ("# average_get_path_length: %f\n", average_get_path_length); + + if (NULL == testbed_handles) + { + INFO ("No peers found\n"); + return; + } + /* Collect Stats*/ + bandwidth_stats_op = GNUNET_TESTBED_get_statistics (n_active, testbed_handles, + "dht", NULL, + bandwidth_stats_iterator, + bandwidth_stats_cont, NULL); } @@ -269,13 +493,15 @@ cancel_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_assert (NULL != ac->dht_get); GNUNET_DHT_get_stop (ac->dht_get); ac->dht_get = NULL; + n_gets_fail++; + GNUNET_assert (NULL != ctx->op); GNUNET_TESTBED_operation_done (ctx->op); ctx->op = NULL; - n_gets_fail++; /* If profiling is complete, summarize */ if (n_active == n_gets_fail + n_gets_ok) summarize (); + } @@ -321,12 +547,20 @@ get_iter (void *cls, ac->dht_get = NULL; GNUNET_SCHEDULER_cancel (ac->delay_task); ac->delay_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_assert (NULL != ctx->op); GNUNET_TESTBED_operation_done (ctx->op); ctx->op = NULL; + total_put_path_length = total_put_path_length + put_path_length; + total_get_path_length = total_get_path_length + get_path_length; + /* Summarize if profiling is complete */ if (n_active == n_gets_fail + n_gets_ok) + { + average_put_path_length = (double)total_put_path_length/(double)n_active; + average_get_path_length = (double)total_get_path_length/(double )n_active; summarize (); + } } @@ -354,6 +588,7 @@ delayed_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } get_ac->nrefs++; ac->get_ac = get_ac; + DEBUG ("Doing a DHT GET for data of size %u\n", get_ac->put_data_size); ac->dht_get = GNUNET_DHT_get_start (ac->dht, GNUNET_BLOCK_TYPE_TEST, &get_ac->hash, @@ -369,6 +604,13 @@ delayed_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +/** + * Connect to DHT services of active peers + */ +static void +start_profiling(); + + /** * Queue up a delayed task for doing DHT GET * @@ -383,13 +625,18 @@ static void put_cont (void *cls, int success) { struct ActiveContext *ac = cls; + struct Context *ctx = ac->ctx; + struct GNUNET_TESTBED_Operation *op; ac->dht_put = NULL; if (success) n_puts_ok++; else n_puts_fail++; - ac->delay_task = GNUNET_SCHEDULER_add_delayed (delay, &delayed_get, ac); + GNUNET_assert (NULL != ctx); + op = ctx->op; + ctx->op = NULL; + GNUNET_TESTBED_operation_done (op); } @@ -415,7 +662,7 @@ delayed_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_CRYPTO_hash (ac->put_data, ac->put_data_size, &ac->hash); DEBUG ("Doing a DHT PUT with data of size %u\n", ac->put_data_size); ac->dht_put = GNUNET_DHT_put (ac->dht, &ac->hash, - 1, /* replication level */ + replication, GNUNET_DHT_RO_NONE, GNUNET_BLOCK_TYPE_TEST, ac->put_data_size, @@ -445,7 +692,7 @@ dht_connected (void *cls, struct ActiveContext *ac = cls; struct Context *ctx = ac->ctx; - GNUNET_assert (NULL != ctx); + GNUNET_assert (NULL != ctx); //FIXME: Fails GNUNET_assert (NULL != ctx->op); GNUNET_assert (ctx->op == op); ac->dht = (struct GNUNET_DHT_Handle *) ca_result; @@ -456,7 +703,15 @@ dht_connected (void *cls, ctx->op = NULL; return; } - ac->delay_task = GNUNET_SCHEDULER_add_delayed (delay, &delayed_put, ac); + switch (mode) + { + case MODE_PUT: + ac->delay_task = GNUNET_SCHEDULER_add_delayed (delay, &delayed_put, ac); + break; + case MODE_GET: + ac->delay_task = GNUNET_SCHEDULER_add_delayed (delay, &delayed_get, ac); + break; + } } @@ -492,12 +747,236 @@ dht_disconnect (void *cls, void *op_result) GNUNET_assert (NULL != ac->dht); GNUNET_assert (ac->dht == op_result); GNUNET_DHT_disconnect (ac->dht); + ac->dht = NULL; n_dht--; - if (0 == n_dht) - GNUNET_SCHEDULER_shutdown (); + if (0 != n_dht) + return; + /* Start GETs if all PUTs have been made */ + if (MODE_PUT == mode) + { + mode = MODE_GET; + start_profiling (); + return; + } + GNUNET_SCHEDULER_shutdown (); +} + + +/** + * Connect to DHT services of active peers + */ +static void +start_profiling() +{ + unsigned int i; + DEBUG("GNUNET_TESTBED_service_connect \n"); + for(i = 0; i < n_active; i++) + { + struct ActiveContext *ac = &a_ac[i]; + ac->ctx->op = + GNUNET_TESTBED_service_connect (ac->ctx, + ac->ctx->peer, + "dht", + &dht_connected, ac, + &dht_connect, + &dht_disconnect, + ac); + } +} + +static unsigned int tries; + +/** + * Stats callback. Iterate over the hashmap and check if all th peers form + * a virtual ring topology. + * + * @param cls closure + * @param op the operation that has been finished + * @param emsg error message in case the operation has failed; will be NULL if + * operation has executed successfully. + */ +static void +successor_stats_cont (void *cls, + struct GNUNET_TESTBED_Operation *op, + const char *emsg) +{ + struct GNUNET_HashCode *val; + struct GNUNET_HashCode *start_val; + struct GNUNET_HashCode *key; + int count; + + /* Don't schedule the task till we are looking for circle here. */ + successor_stats_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_TESTBED_operation_done (successor_stats_op); + successor_stats_op = NULL; + + start_val = + (struct GNUNET_HashCode *) GNUNET_CONTAINER_multihashmap_get(successor_peer_hashmap, + start_key); + val = GNUNET_new(struct GNUNET_HashCode); + key = GNUNET_new(struct GNUNET_HashCode); + val = start_val; + for (count = 0; count < num_peers; count++) + { + key = val; + val = GNUNET_CONTAINER_multihashmap_get (successor_peer_hashmap, + key); + GNUNET_assert(NULL != val); + /* Remove the entry from hashmap. This is done to take care of loop. */ + if (GNUNET_NO == + GNUNET_CONTAINER_multihashmap_remove (successor_peer_hashmap, + key, val)) + { + DEBUG ("Failed to remove entry from hashmap\n"); + break; + } + /* If a peer has its own identity as its successor. */ + if (0 == memcmp(&key, &val, sizeof (struct GNUNET_HashCode))) + { + break; + } + } + + if ((start_val == val) && (count == num_peers)) + { + DEBUG("CIRCLE COMPLETED after %u tries", tries); + //FIXME: FREE HASHMAP. + if(GNUNET_SCHEDULER_NO_TASK == successor_stats_task) + start_profiling(); + return; + } + else + { + if (max_searches == ++tries) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Maximum tries %u exceeded while checking successor TOTAL TRIES %u" + " cirle formation. Exiting\n", + max_searches,tries); + //FIXME: FREE HASHMAP + if (GNUNET_SCHEDULER_NO_TASK != successor_stats_task) + { + successor_stats_task = GNUNET_SCHEDULER_NO_TASK; + } + if(GNUNET_SCHEDULER_NO_TASK == successor_stats_task) + { + start_profiling(); + } + + return; + } + + flag = 0; + successor_stats_task = GNUNET_SCHEDULER_add_delayed (delay, &collect_stats, cls); + } } +/** + * Process successor statistic values. + * + * @param cls closure + * @param peer the peer the statistic belong to + * @param subsystem name of subsystem that created the statistic + * @param name the name of the datum + * @param value the current value + * @param is_persistent GNUNET_YES if the value is persistent, GNUNET_NO if not + * @return GNUNET_OK to continue, GNUNET_SYSERR to abort iteration + */ +static int +successor_stats_iterator (void *cls, + const struct GNUNET_TESTBED_Peer *peer, + const char *subsystem, + const char *name, + uint64_t value, + int is_persistent) +{ + static const char *key_string = "XDHT"; + if (0 == strncmp (key_string, name, strlen (key_string))) + { + char *my_id_str; + char successor_str[13]; + char truncated_my_id_str[13]; + char truncated_successor_str[13]; + struct GNUNET_HashCode *my_id_key; + struct GNUNET_HashCode *succ_key; + + strtok((char *)name,":"); + my_id_str = strtok(NULL,":"); + + strncpy(truncated_my_id_str, my_id_str, 12); + truncated_my_id_str[12] = '\0'; + my_id_key = GNUNET_new(struct GNUNET_HashCode); + GNUNET_CRYPTO_hash (truncated_my_id_str, sizeof(truncated_my_id_str),my_id_key); + GNUNET_STRINGS_data_to_string(&value, sizeof(uint64_t), successor_str, 13); + strncpy(truncated_successor_str, successor_str, 12); + truncated_successor_str[12] ='\0'; + + succ_key = GNUNET_new(struct GNUNET_HashCode); + GNUNET_CRYPTO_hash (truncated_successor_str, sizeof(truncated_successor_str),succ_key); + + if (0 == flag) + { + start_key = my_id_key; + flag = 1; + } + /* FIXME: GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE do not free the value + which is replaced, need to free it. */ + GNUNET_CONTAINER_multihashmap_put (successor_peer_hashmap, + my_id_key, (void *)succ_key, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE); + } + return GNUNET_OK; +} + + +/* + * Task that collects peer and its corresponding successors. + * + * @param cls Closure (NULL). + * @param tc Task Context. + */ +static void +collect_stats (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + if ((GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason) != 0) + return; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Start collecting statistics...\n"); + GNUNET_assert(NULL != testbed_handles); + successor_stats_op = + GNUNET_TESTBED_get_statistics (num_peers, testbed_handles, + "dht", NULL, + successor_stats_iterator, + successor_stats_cont, cls); + + GNUNET_assert(NULL != successor_stats_op); +} + + +#if ENABLE_MALICIOUS +/** + * Set the malicious variable in peer malicious context. + */ +static void +set_malicious() +{ + unsigned int i; + DEBUG ("Setting %u peers malicious"); + for(i = 0; i < n_malicious; i++) + { + struct MaliciousContext *mc = &a_mc[i]; + mc->ctx->op = + GNUNET_TESTBED_service_connect (ac->ctx, + ac->ctx->peer, + "dht", + &dht_set_malicious, mc, + &dht_connect, + &dht_finish, + mc); + } +} +#endif /** * Callback called when DHT service on the peer is started * @@ -516,16 +995,23 @@ service_started (void *cls, GNUNET_assert (NULL != ctx); GNUNET_assert (NULL != ctx->op); GNUNET_TESTBED_operation_done (ctx->op); - ctx->op = NULL; - if (NULL == ctx->ac) - return; - /* FIXME: connect to the DHT service and wait before starting a PUT */ - ctx->op = GNUNET_TESTBED_service_connect (ctx, ctx->peer, - "dht", - &dht_connected, ctx->ac, - &dht_connect, - &dht_disconnect, - ctx->ac); + peers_started++; + DEBUG("Peers Started = %d; num_peers = %d \n", peers_started, num_peers); + if (GNUNET_SCHEDULER_NO_TASK == successor_stats_task && peers_started == num_peers) + { +#if ENABLE_MALICIOUS + set_malicious(); +#endif + DEBUG("successor_stats_task \n"); + struct Collect_Stat_Context *collect_stat_cls = GNUNET_new(struct Collect_Stat_Context); + collect_stat_cls->service_connect_ctx = cls; + collect_stat_cls->op = op; + successor_peer_hashmap = GNUNET_CONTAINER_multihashmap_create (num_peers, + GNUNET_NO); + successor_stats_task = GNUNET_SCHEDULER_add_delayed (delay, + &collect_stats, + collect_stat_cls); + } } @@ -549,7 +1035,8 @@ test_run (void *cls, { unsigned int cnt; unsigned int ac_cnt; - + + testbed_handles = peers; if (NULL == peers) { /* exit */ @@ -566,6 +1053,26 @@ test_run (void *cls, GNUNET_free (a_ctx); return; } + +#if ENABLE_MALICIOUS + + if(PUT_PROBABILITY + MALICIOUS_PEERS > 100) + { + DEBUG ("Reduce either number of malicious peer or active peers. "); + GNUNET_SCHEDULER_shutdown (); + GNUNET_free (a_ctx); + return; + } + + /* Select the peers which should act maliciously. */ + n_malicious = num_peers * MALICIOUS_PEERS / 100; + + /* Select n_malicious peers and ensure that those are not active peers. + keep all malicious peer at one place, and call act malicious for all + those peers. */ + +#endif + a_ac = GNUNET_malloc (n_active * sizeof (struct ActiveContext)); ac_cnt = 0; for (cnt = 0; cnt < num_peers && ac_cnt < n_active; cnt++) @@ -638,22 +1145,30 @@ main (int argc, char *const *argv) {'n', "peers", "COUNT", gettext_noop ("number of peers to start"), 1, &GNUNET_GETOPT_set_uint, &num_peers}, + {'s', "searches", "COUNT", + gettext_noop ("maximum number of times we try to search for successor circle formation (default is 1)"), + 1, &GNUNET_GETOPT_set_uint, &max_searches}, {'H', "hosts", "FILENAME", gettext_noop ("name of the file with the login information for the testbed"), 1, &GNUNET_GETOPT_set_string, &hosts_file}, {'d', "delay", "DELAY", gettext_noop ("delay for starting DHT PUT and GET"), 1, &GNUNET_GETOPT_set_relative_time, &delay}, + {'r', "replication", "DEGREE", + gettext_noop ("replication degree for DHT PUTs"), + 1, &GNUNET_GETOPT_set_uint, &replication}, {'t', "timeout", "TIMEOUT", gettext_noop ("timeout for DHT PUT and GET requests"), 1, &GNUNET_GETOPT_set_relative_time, &timeout}, GNUNET_GETOPT_OPTION_END }; + max_searches = 5; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) return 2; - delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3); /* default delay */ - timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 3); /* default timeout */ + delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20); /* default delay */ + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1); /* default timeout */ + replication = 1; /* default replication */ rc = 0; if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "dht-profiler",