From b97fc3b79ac05184f0928af8071e76dcbd8f99ea Mon Sep 17 00:00:00 2001 From: Omar Tarabai Date: Mon, 22 Sep 2014 13:19:57 +0000 Subject: [PATCH] sensor: profiler updates --- src/sensor/gnunet-sensor-profiler.c | 144 +++++++++++++------ src/sensor/gnunet-service-sensor_reporting.c | 37 +++-- src/sensor/profiler.py | 57 +++++++- 3 files changed, 174 insertions(+), 64 deletions(-) diff --git a/src/sensor/gnunet-sensor-profiler.c b/src/sensor/gnunet-sensor-profiler.c index 4cd6b808d..3e25cb732 100644 --- a/src/sensor/gnunet-sensor-profiler.c +++ b/src/sensor/gnunet-sensor-profiler.c @@ -85,6 +85,19 @@ struct ConnectionContext }; +struct Split +{ + + struct Split *next; + + struct Split *prev; + + int p1; + + int p2; + +}; + /** * Name of the configuration file used @@ -126,6 +139,11 @@ static unsigned int sensors_interval = 0; */ static char *topology_file; +/** + * Path to topology file (Option -s) + */ +static char *split_file; + /** * Array of peer info for all peers */ @@ -176,11 +194,21 @@ static GNUNET_SCHEDULER_TaskIdentifier delayed_task = GNUNET_SCHEDULER_NO_TASK; */ static struct DisconnectionContext *dc_head; -/* +/** * Tail of list of disconnection contexts */ static struct DisconnectionContext *dc_tail; +/** + * Head of splits list + */ +static struct Split *split_head; + +/** + * Tail of splits list + */ +static struct Split *split_tail; + /** * Copy directory recursively @@ -417,7 +445,7 @@ disconnect_peers (struct PeerInfo *p1, struct PeerInfo *p2) */ static void overlay_connect_cb (void *cls, struct GNUNET_TESTBED_Operation *op, - const char *emsg) + const char *emsg) { struct ConnectionContext *cc = cls; @@ -449,7 +477,7 @@ connect_peers (struct PeerInfo *p1, struct PeerInfo *p2) { if ((dc->p1 == p1 && dc->p2 == p2) || (dc->p1 == p2 && dc->p2 == p1)) break; - dc = dc_head->next; + dc = dc->next; } if (NULL != dc) { @@ -458,10 +486,11 @@ connect_peers (struct PeerInfo *p1, struct PeerInfo *p2) } /* Connect peers using testbed */ cc = GNUNET_new (struct ConnectionContext); + cc->p1 = p1; cc->p2 = p2; - GNUNET_TESTBED_overlay_connect (cc, &overlay_connect_cb, cc, - p1->testbed_peer, p2->testbed_peer); + GNUNET_TESTBED_overlay_connect (cc, &overlay_connect_cb, cc, p1->testbed_peer, + p2->testbed_peer); } /*****************************************************************************/ @@ -552,8 +581,8 @@ sensor_dir_scanner (void *cls, const char *filename) GNUNET_CONFIGURATION_parse (sensor_cfg, filename)); GNUNET_CONFIGURATION_set_value_string (sensor_cfg, file_basename, "COLLECTION_POINT", - GNUNET_i2s_full (&all_peers_info[0]. - peer_id)); + GNUNET_i2s_full (&all_peers_info + [0].peer_id)); if (sensors_interval > 0) { GNUNET_CONFIGURATION_set_value_number (sensor_cfg, file_basename, @@ -716,28 +745,18 @@ peerstore_disconnect_adapter (void *cls, void *op_result) * Prompty the user to reconnect two peers */ static void -prompt_peer_reconnection (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +prompt_peer_reconnection (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - int p1; - int p2; - char line[10]; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Connect peers (e.g. '0,2') or empty line to execute:\n"); - if (NULL == fgets (line, sizeof (line), stdin) || 1 == strlen (line)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Continuing.\n"); - return; - } - if (2 != sscanf (line, "%d,%d", &p1, &p2) || p1 >= num_peers || - p2 >= num_peers || p1 < 0 || p2 < 0 || p1 == p2) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Invalid input.\n"); - prompt_peer_reconnection (NULL, NULL); - return; - } - connect_peers (&all_peers_info[p1], &all_peers_info[p2]); - prompt_peer_reconnection (NULL, NULL); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting one link.\n"); + connect_peers (&all_peers_info[split_head->p1], + &all_peers_info[split_head->p2]); + GNUNET_SCHEDULER_cancel (shutdown_task); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutting down in 5 mins.\n"); + shutdown_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_MINUTES, 5), do_shutdown, + NULL); } @@ -747,27 +766,19 @@ prompt_peer_reconnection (void *cls, const struct GNUNET_SCHEDULER_TaskContext * static void prompt_peer_disconnection () { - int p1; - int p2; - char line[10]; + struct Split *s; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Disconnect peers (e.g. '0,2') or empty line to execute:\n"); - if (NULL == fgets (line, sizeof (line), stdin) || 1 == strlen (line)) + s = split_head; + while (NULL != s) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Will prompt for reconnection in 1 min.\n"); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 1) ,&prompt_peer_reconnection, NULL); - return; + disconnect_peers (&all_peers_info[s->p1], &all_peers_info[s->p2]); + s = s->next; } - if (2 != sscanf (line, "%d,%d", &p1, &p2) || p1 >= num_peers || - p2 >= num_peers || p1 < 0 || p2 < 0 || p1 == p2) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Invalid input.\n"); - prompt_peer_disconnection (); - return; - } - disconnect_peers (&all_peers_info[p1], &all_peers_info[p2]); - prompt_peer_disconnection (); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Will prompt for reconnection in 1 min.\n"); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_MINUTES, 5), + &prompt_peer_reconnection, NULL); } @@ -963,6 +974,44 @@ verify_args () } +/** + * Parse split file (name passed as parameter). + * Split file contains sequence of peer pairs to disconenct. + */ +static void +parse_split_file () +{ + uint64_t f_size; + char *splits; + char *ptr; + int p1; + int p2; + struct Split *s; + + GNUNET_assert (NULL != split_file); + GNUNET_assert (GNUNET_OK == + GNUNET_DISK_file_size (split_file, &f_size, GNUNET_NO, + GNUNET_YES)); + splits = malloc (f_size); + GNUNET_assert (f_size == GNUNET_DISK_fn_read (split_file, splits, f_size)); + ptr = splits; + while (ptr < (splits + f_size)) + { + GNUNET_assert (2 == sscanf (ptr, "%d,%d", &p1, &p2)); + s = GNUNET_new (struct Split); + + s->p1 = p1; + s->p2 = p2; + GNUNET_CONTAINER_DLL_insert_tail (split_head, split_tail, s); + while (ptr < (splits + f_size) && *ptr != '\n') + ptr++; + if (*ptr == '\n') + ptr++; + } + GNUNET_free (splits); +} + + /** * Actual main function. * @@ -986,6 +1035,7 @@ run (void *cls, char *const *args, const char *cf, cfg, "TESTBED", "OVERLAY_TOPOLOGY_FILE", topology_file); + parse_split_file (); shutdown_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &do_shutdown, NULL); @@ -1009,6 +1059,8 @@ main (int argc, char *const *argv) {'i', "sensors-interval", "INTERVAL", gettext_noop ("Change the interval of running sensors to given value"), GNUNET_YES, &GNUNET_GETOPT_set_uint, &sensors_interval}, + {'s', "split-file", "FILEPATH", gettext_noop ("Path to split file"), + GNUNET_YES, &GNUNET_GETOPT_set_filename, &split_file}, GNUNET_GETOPT_OPTION_END }; diff --git a/src/sensor/gnunet-service-sensor_reporting.c b/src/sensor/gnunet-service-sensor_reporting.c index 3b2c08e42..1c19b38f9 100644 --- a/src/sensor/gnunet-service-sensor_reporting.c +++ b/src/sensor/gnunet-service-sensor_reporting.c @@ -423,20 +423,33 @@ static void destroy_core_peer (struct CorePeer *corep) { struct AnomalyInfo *ai; + struct AnomalyReportingQueueItem *ar_item; - if (NULL != corep->mq) - { - GNUNET_MQ_destroy (corep->mq); - corep->mq = NULL; - } ai = ai_head; while (NULL != ai) { GNUNET_assert (NULL != ai->anomalous_neighbors); GNUNET_CONTAINER_multipeermap_remove_all (ai->anomalous_neighbors, corep->peer_id); + /* Remove the core peer from any reporting queues */ + ar_item = ai->reporting_queue_head; + while (NULL != ar_item) + { + if (ar_item->dest_mq == corep->mq) + { + GNUNET_CONTAINER_DLL_remove (ai->reporting_queue_head, + ai->reporting_queue_tail, ar_item); + break; + } + ar_item = ar_item->next; + } ai = ai->next; } + if (NULL != corep->mq) + { + GNUNET_MQ_destroy (corep->mq); + corep->mq = NULL; + } GNUNET_free (corep); } @@ -873,9 +886,9 @@ update_anomaly_report_pow_block (struct AnomalyInfo *ai) arm->anomalous = htons (ai->anomalous); arm->anomalous_neighbors = (0 == - neighborhood) ? 0 : ((float) GNUNET_CONTAINER_multipeermap_size (ai-> - anomalous_neighbors)) - / neighborhood; + neighborhood) ? 0 : ((float) + GNUNET_CONTAINER_multipeermap_size + (ai->anomalous_neighbors)) / neighborhood; timestamp = GNUNET_TIME_absolute_get (); ai->report_creation_cx = GNUNET_SENSOR_crypto_pow_sign (arm, @@ -968,8 +981,8 @@ handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other, my_anomaly_info = get_anomaly_info_by_sensor (sensor); GNUNET_assert (NULL != my_anomaly_info); peer_in_anomalous_list = - GNUNET_CONTAINER_multipeermap_contains (my_anomaly_info-> - anomalous_neighbors, other); + GNUNET_CONTAINER_multipeermap_contains + (my_anomaly_info->anomalous_neighbors, other); peer_anomalous = ntohs (arm->anomalous); LOG (GNUNET_ERROR_TYPE_DEBUG, "Received an anomaly update from neighbour `%s' (%d).\n", @@ -988,8 +1001,8 @@ handle_anomaly_report (void *cls, const struct GNUNET_PeerIdentity *other, if (GNUNET_NO == peer_in_anomalous_list) /* repeated negative report */ GNUNET_break_op (0); else - GNUNET_CONTAINER_multipeermap_remove_all (my_anomaly_info-> - anomalous_neighbors, other); + GNUNET_CONTAINER_multipeermap_remove_all + (my_anomaly_info->anomalous_neighbors, other); } /* This is important to create an updated block since the data changed */ update_anomaly_report_pow_block (my_anomaly_info); diff --git a/src/sensor/profiler.py b/src/sensor/profiler.py index 7b77b48bb..8e48ab838 100644 --- a/src/sensor/profiler.py +++ b/src/sensor/profiler.py @@ -16,6 +16,8 @@ def get_args(): parser = argparse.ArgumentParser(description="Sensor profiler") parser.add_argument('-p', '--peers', action='store', type=int, required=True, help='Number of peers to run') + parser.add_argument('-l', '--links', action='store', type=int, required=False, + help='Number of links to create') parser.add_argument('-i', '--sensors-interval', action='store', type=int, required=False, help='Change the interval of running sensors to given value') @@ -66,9 +68,22 @@ def draw_graph(): inc += 1 print 'Drawing graph to file: %s' % name plt.clf() + anomaly_lbls = {} + for i in range(len(graph.node)): + if node_colors[i] >= 1: + anomaly_lbls[i] = '\n\n\n' + str(node_colors[i] - 1) networkx.draw(graph, pos=pos, node_color=node_colors, with_labels=range(len(graph.node)), cmap=plt.cm.Reds, vmin=0, vmax=2) + networkx.draw_networkx_labels(graph, pos, anomaly_lbls) plt.savefig(name) +def peers_reconnected(p1, p2): + global graph + if p2 in graph[p1]: + print 'Link already exists' + return + graph.add_edge(p1, p2) + draw_graph() + def peers_disconnected(p1, p2): global graph print 'Disconnected peers %d and %d' % (p1, p2) @@ -83,7 +98,10 @@ def anomaly_report(report): if 0 == report['anomalous']: node_colors[report['peer']] = 0 else: - node_colors[report['peer']] = 1 + report['neighbors'] + clr = 1 + report['neighbors'] + if node_colors[report['peer']] >= clr: + return + node_colors[report['peer']] = clr draw_graph() def handle_profiler_line(line): @@ -99,9 +117,13 @@ def handle_profiler_line(line): parts = line.split('Anomaly report:') anomaly_report(eval(parts[1])) return + if 'Peer connection request sent' in line: # Peers reconnected + parts = line.split(':') + peers = parts[-1].split(',') + peers_reconnected(int(peers[0]), int(peers[1])) -def run_profiler(peers, topology_file, sensors_interval): - cmd1 = "GNUNET_FORCE_LOG='gnunet-sensor-profiler;;;;DEBUG' gnunet-sensor-profiler -p %d -t %s" % (peers, topology_file) +def run_profiler(peers, topology_file, sensors_interval, split_file): + cmd1 = "./gnunet-sensor-profiler -p %d -t %s -s %s" % (peers, topology_file, split_file) if sensors_interval: cmd1 += " -i %d" % sensors_interval cmd2 = "> log 2>&1" @@ -120,6 +142,22 @@ def run_profiler(peers, topology_file, sensors_interval): line += c os.remove('log') +def create_split(): + global graph + f = open('split', 'w+') + half_size = len(graph.node) / 2 + half1 = [] + half2 = [] + for n in graph.node: + if n < half_size: + half1.append(n) + else: + half2.append(n) + for e in graph.edges(): + if (e[0] in half1 and e[1] in half2) or (e[0] in half2 and e[1] in half1): + f.write('%d,%d\n' % (e[0], e[1])) + f.close() + def main(): args = vars(get_args()) num_peers = args['peers'] @@ -129,17 +167,24 @@ def main(): sensors_interval = None if 'sensors_interval' in args: sensors_interval = args['sensors_interval'] - #num_links = int(math.log(num_peers) * math.log(num_peers) * num_peers / 2) - num_links = int(math.log(num_peers) * num_peers) + if 'links' in args: + num_links = args['links'] + else: + #num_links = int(math.log(num_peers) * math.log(num_peers) * num_peers / 2) + num_links = int(math.log(num_peers) * num_peers) # Generate random topology generate_topology(num_peers, num_links) print 'Generated random topology with %d peers and %d links' % (num_peers, num_links) + # Create a file with links to cut to split the topology into two + create_split() # Create TESTBED topology file top_file = create_topology_file() print 'Created TESTBED topology file %s' % top_file draw_graph() # Run c profiler - run_profiler(num_peers, top_file, sensors_interval) + if os.path.isfile('log'): + os.remove('log') + run_profiler(num_peers, top_file, sensors_interval, 'split') if __name__ == "__main__": main() -- 2.25.1