From af4e2e306bd703958ab0b8de1ab25fcc0a528eea Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 19 Feb 2017 12:39:15 +0100 Subject: [PATCH] converting consensus service to new SERVICE API --- src/consensus/gnunet-service-consensus.c | 490 +++++++++++------------ 1 file changed, 234 insertions(+), 256 deletions(-) diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 64decc29e..16ca6a57f 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2012, 2013 GNUnet e.V. + Copyright (C) 2012, 2013, 2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -451,7 +451,7 @@ struct ConsensusSession /** * Client that inhabits the session */ - struct GNUNET_SERVER_Client *client; + struct GNUNET_SERVICE_Client *client; /** * Queued messages to the client. @@ -509,11 +509,6 @@ static struct ConsensusSession *sessions_tail; */ static const struct GNUNET_CONFIGURATION_Handle *cfg; -/** - * Handle to the server for this service. - */ -static struct GNUNET_SERVER_Handle *srv; - /** * Peer that runs this service. */ @@ -528,9 +523,11 @@ struct GNUNET_STATISTICS_Handle *statistics; static void finish_task (struct TaskEntry *task); + static void run_ready_steps (struct ConsensusSession *session); + static const char * phasename (uint16_t phase) { @@ -652,36 +649,6 @@ debug_str_rfn_key (const struct RfnKey *rk) #endif /* GNUNET_EXTRA_LOGGING */ -/** - * Destroy a session, free all resources associated with it. - * - * @param session the session to destroy - */ -static void -destroy_session (struct ConsensusSession *session) -{ - GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session); - if (NULL != session->set_listener) - { - GNUNET_SET_listen_cancel (session->set_listener); - session->set_listener = NULL; - } - if (NULL != session->client_mq) - { - GNUNET_MQ_destroy (session->client_mq); - session->client_mq = NULL; - /* The MQ cleanup will also disconnect the underlying client. */ - session->client = NULL; - } - if (NULL != session->client) - { - GNUNET_SERVER_client_disconnect (session->client); - session->client = NULL; - } - GNUNET_free (session); -} - - /** * Send the final result set of the consensus to the client, element by * element. @@ -1511,12 +1478,14 @@ rfn_create (uint16_t size) } -void +#if UNUSED +static void diff_destroy (struct DiffEntry *diff) { GNUNET_CONTAINER_multihashmap_destroy (diff->changes); GNUNET_free (diff); } +#endif /** @@ -2328,45 +2297,48 @@ peer_id_cmp (const void *h1, const void *h2) /** * Create the sorted list of peers for the session, * add the local peer if not in the join message. + * + * @param session session to initialize + * @param join_msg join message with the list of peers participating at the end */ static void initialize_session_peer_list (struct ConsensusSession *session, - struct GNUNET_CONSENSUS_JoinMessage *join_msg) + const struct GNUNET_CONSENSUS_JoinMessage *join_msg) { - unsigned int local_peer_in_list; - uint32_t listed_peers; - const struct GNUNET_PeerIdentity *msg_peers; - unsigned int i; - - GNUNET_assert (NULL != join_msg); - - /* peers in the join message, may or may not include the local peer */ - listed_peers = ntohl (join_msg->num_peers); - - session->num_peers = listed_peers; + const struct GNUNET_PeerIdentity *msg_peers + = (const struct GNUNET_PeerIdentity *) &join_msg[1]; + int local_peer_in_list; - msg_peers = (struct GNUNET_PeerIdentity *) &join_msg[1]; + session->num_peers = ntohl (join_msg->num_peers); + /* Peers in the join message, may or may not include the local peer, + Add it if it is missing. */ local_peer_in_list = GNUNET_NO; - for (i = 0; i < listed_peers; i++) + for (unsigned int i = 0; i < session->num_peers; i++) { - if (0 == memcmp (&msg_peers[i], &my_peer, sizeof (struct GNUNET_PeerIdentity))) + if (0 == memcmp (&msg_peers[i], + &my_peer, + sizeof (struct GNUNET_PeerIdentity))) { local_peer_in_list = GNUNET_YES; break; } } - if (GNUNET_NO == local_peer_in_list) session->num_peers++; - session->peers = GNUNET_malloc (session->num_peers * sizeof (struct GNUNET_PeerIdentity)); - + session->peers = GNUNET_new_array (session->num_peers, + struct GNUNET_PeerIdentity); if (GNUNET_NO == local_peer_in_list) session->peers[session->num_peers - 1] = my_peer; - GNUNET_memcpy (session->peers, msg_peers, listed_peers * sizeof (struct GNUNET_PeerIdentity)); - qsort (session->peers, session->num_peers, sizeof (struct GNUNET_PeerIdentity), &peer_id_cmp); + GNUNET_memcpy (session->peers, + msg_peers, + ntohl (join_msg->num_peers) * sizeof (struct GNUNET_PeerIdentity)); + qsort (session->peers, + session->num_peers, + sizeof (struct GNUNET_PeerIdentity), + &peer_id_cmp); } @@ -2924,177 +2896,160 @@ construct_task_graph (struct ConsensusSession *session) } + /** - * Initialize the session, continue receiving messages from the owning client + * Check join message. * - * @param session the session to initialize - * @param join_msg the join message from the client + * @param cls session of client that sent the message + * @param m message sent by the client + * @return #GNUNET_OK if @a m is well-formed + */ +static int +check_client_join (void *cls, + const struct GNUNET_CONSENSUS_JoinMessage *m) +{ + uint32_t listed_peers = ntohl (m->num_peers); + + if ( (ntohs (m->header.size) - sizeof (*m)) != + listed_peers * sizeof (struct GNUNET_PeerIdentity)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Called when a client wants to join a consensus session. + * + * @param cls session of client that sent the message + * @param m message sent by the client */ static void -initialize_session (struct ConsensusSession *session, - struct GNUNET_CONSENSUS_JoinMessage *join_msg) +handle_client_join (void *cls, + const struct GNUNET_CONSENSUS_JoinMessage *m) { + struct ConsensusSession *session = cls; struct ConsensusSession *other_session; - initialize_session_peer_list (session, join_msg); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session with %u peers\n", session->num_peers); - compute_global_id (session, &join_msg->session_id); + initialize_session_peer_list (session, + m); + compute_global_id (session, + &m->session_id); /* Check if some local client already owns the session. It is only legal to have a session with an existing global id if all other sessions with this global id are finished.*/ - other_session = sessions_head; - while (NULL != other_session) + for (other_session = sessions_head; + NULL != other_session; + other_session = other_session->next) { - if ((other_session != session) && - (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, &other_session->global_id))) - { - //if (CONSENSUS_ROUND_FINISH != other_session->current_round) - //{ - // GNUNET_break (0); - // destroy_session (session); - // return; - //} + if ( (other_session != session) && + (0 == GNUNET_CRYPTO_hash_cmp (&session->global_id, + &other_session->global_id)) ) break; - } - other_session = other_session->next; } - session->conclude_deadline = GNUNET_TIME_absolute_ntoh (join_msg->deadline); - session->conclude_start = GNUNET_TIME_absolute_ntoh (join_msg->start); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus with timeout %llums created\n", - (long long) (GNUNET_TIME_absolute_get_difference (session->conclude_start, session->conclude_deadline)).rel_value_us / 1000); - - session->local_peer_idx = get_peer_idx (&my_peer, session); + session->conclude_deadline + = GNUNET_TIME_absolute_ntoh (m->deadline); + session->conclude_start + = GNUNET_TIME_absolute_ntoh (m->start); + session->local_peer_idx = get_peer_idx (&my_peer, + session); GNUNET_assert (-1 != session->local_peer_idx); - session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, - &session->global_id, - set_listen_cb, session); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%d is the local peer\n", session->local_peer_idx); - session->setmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Joining consensus session %s containing %u peers as %u with timeout %s\n", + GNUNET_h2s (&m->session_id), + session->num_peers, + session->local_peer_idx, + GNUNET_STRINGS_relative_time_to_string + (GNUNET_TIME_absolute_get_difference (session->conclude_start, + session->conclude_deadline), + GNUNET_YES)); + + session->set_listener + = GNUNET_SET_listen (cfg, + GNUNET_SET_OPERATION_UNION, + &session->global_id, + &set_listen_cb, + session); + + session->setmap = GNUNET_CONTAINER_multihashmap_create (1, + GNUNET_NO); + session->taskmap = GNUNET_CONTAINER_multihashmap_create (1, + GNUNET_NO); + session->diffmap = GNUNET_CONTAINER_multihashmap_create (1, + GNUNET_NO); + session->rfnmap = GNUNET_CONTAINER_multihashmap_create (1, + GNUNET_NO); { struct SetEntry *client_set; + client_set = GNUNET_new (struct SetEntry); - client_set->h = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); + client_set->h = GNUNET_SET_create (cfg, + GNUNET_SET_OPERATION_UNION); client_set->key = ((struct SetKey) { SET_KIND_CURRENT, 0, 0 }); - put_set (session, client_set); + put_set (session, + client_set); } - session->peers_blacklisted = GNUNET_new_array (session->num_peers, int); + session->peers_blacklisted = GNUNET_new_array (session->num_peers, + int); /* Just construct the task graph, but don't run anything until the client calls conclude. */ construct_task_graph (session); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "session %s initialized\n", GNUNET_h2s (&session->global_id)); + GNUNET_SERVICE_client_continue (session->client); } -static struct ConsensusSession * -get_session_by_client (struct GNUNET_SERVER_Client *client) +static void +client_insert_done (void *cls) { - struct ConsensusSession *session; - - session = sessions_head; - while (NULL != session) - { - if (session->client == client) - return session; - session = session->next; - } - return NULL; + // FIXME: implement } /** - * Called when a client wants to join a consensus session. + * Called when a client performs an insert operation. * - * @param cls unused - * @param client client that sent the message - * @param m message sent by the client + * @param cls client handle + * @param msg message sent by the client + * @return #GNUNET_OK (always well-formed) */ -static void -client_join (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) -{ - struct ConsensusSession *session; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join message sent by client\n"); - - session = get_session_by_client (client); - if (NULL != session) - { - GNUNET_break (0); - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); - return; - } - session = GNUNET_new (struct ConsensusSession); - session->client = client; - session->client_mq = GNUNET_MQ_queue_for_server_client (client); - GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); - initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "join done\n"); -} - - -static void -client_insert_done (void *cls) +static int +check_client_insert (void *cls, + const struct GNUNET_CONSENSUS_ElementMessage *msg) { - // FIXME: implement + return GNUNET_OK; } /** * Called when a client performs an insert operation. * - * @param cls (unused) - * @param client client handle - * @param m message sent by the client + * @param cls client handle + * @param msg message sent by the client */ -void -client_insert (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +static void +handle_client_insert (void *cls, + const struct GNUNET_CONSENSUS_ElementMessage *msg) { - struct ConsensusSession *session; - struct GNUNET_CONSENSUS_ElementMessage *msg; + struct ConsensusSession *session = cls; struct GNUNET_SET_Element *element; ssize_t element_size; struct GNUNET_SET_Handle *initial_set; - session = get_session_by_client (client); - - if (NULL == session) - { - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (client); - return; - } - if (GNUNET_YES == session->conclude_started) { GNUNET_break (0); - GNUNET_SERVER_client_disconnect (client); + GNUNET_SERVICE_client_drop (session->client); return; } - - msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; element_size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); - if (element_size < 0) - { - GNUNET_break (0); - return; - } - element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); element->element_type = msg->element_type; element->size = element_size; @@ -3103,71 +3058,61 @@ client_insert (void *cls, { struct SetKey key = { SET_KIND_CURRENT, 0, 0 }; struct SetEntry *entry; - entry = lookup_set (session, &key); + + entry = lookup_set (session, + &key); GNUNET_assert (NULL != entry); initial_set = entry->h; } session->num_client_insert_pending++; - GNUNET_SET_add_element (initial_set, element, client_insert_done, session); + GNUNET_SET_add_element (initial_set, + element, + &client_insert_done, + session); #ifdef GNUNET_EXTRA_LOGGING { struct GNUNET_HashCode hash; - GNUNET_SET_element_hash (element, &hash); + GNUNET_SET_element_hash (element, + &hash); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "P%u: element %s added\n", session->local_peer_idx, GNUNET_h2s (&hash)); } #endif - GNUNET_free (element); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVICE_client_continue (session->client); } /** * Called when a client performs the conclude operation. * - * @param cls (unused) - * @param client client handle + * @param cls client handle * @param message message sent by the client */ static void -client_conclude (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_client_conclude (void *cls, + const struct GNUNET_MessageHeader *message) { - struct ConsensusSession *session; - - session = get_session_by_client (client); - if (NULL == session) - { - /* client not found */ - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (client); - return; - } + struct ConsensusSession *session = cls; if (GNUNET_YES == session->conclude_started) { /* conclude started twice */ GNUNET_break (0); - GNUNET_SERVER_client_disconnect (client); - destroy_session (session); + GNUNET_SERVICE_client_drop (session->client); return; } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "conclude requested\n"); - + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "conclude requested\n"); session->conclude_started = GNUNET_YES; - install_step_timeouts (session); run_ready_steps (session); - - - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVICE_client_continue (session->client); } @@ -3179,82 +3124,115 @@ client_conclude (void *cls, static void shutdown_task (void *cls) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "shutting down\n"); - while (NULL != sessions_head) - destroy_session (sessions_head); - - GNUNET_STATISTICS_destroy (statistics, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "shutting down\n"); + GNUNET_STATISTICS_destroy (statistics, + GNUNET_NO); + statistics = NULL; } /** - * Clean up after a client after it is - * disconnected (either by us or by itself) + * Start processing consensus requests. * - * @param cls closure, unused - * @param client the client to clean up after + * @param cls closure + * @param c configuration to use + * @param service the initialized service */ -void -handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *c, + struct GNUNET_SERVICE_Handle *service) { - struct ConsensusSession *session; - - session = get_session_by_client (client); - if (NULL == session) + cfg = c; + if (GNUNET_OK != + GNUNET_CRYPTO_get_peer_identity (cfg, + &my_peer)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not retrieve host identity\n"); + GNUNET_SCHEDULER_shutdown (); return; - // FIXME: destroy if we can + } + statistics = GNUNET_STATISTICS_create ("consensus", + cfg); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); } +/** + * Callback called when a client connects to the service. + * + * @param cls closure for the service + * @param c the new client that connected to the service + * @param mq the message queue used to send messages to the client + * @return @a c + */ +static void * +client_connect_cb (void *cls, + struct GNUNET_SERVICE_Client *c, + struct GNUNET_MQ_Handle *mq) +{ + struct ConsensusSession *session = GNUNET_new (struct ConsensusSession); + + session->client = c; + session->client_mq = mq; + GNUNET_CONTAINER_DLL_insert (sessions_head, + sessions_tail, + session); + return session; +} + /** - * Start processing consensus requests. + * Callback called when a client disconnected from the service * - * @param cls closure - * @param server the initialized server - * @param c configuration to use + * @param cls closure for the service + * @param c the client that disconnected + * @param internal_cls should be equal to @a c */ static void -run (void *cls, struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *c) +client_disconnect_cb (void *cls, + struct GNUNET_SERVICE_Client *c, + void *internal_cls) { - static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { - {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, - sizeof (struct GNUNET_MessageHeader)}, - {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, - {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, - {NULL, NULL, 0, 0} - }; + struct ConsensusSession *session = internal_cls; - cfg = c; - srv = server; - if (GNUNET_OK != GNUNET_CRYPTO_get_peer_identity (cfg, &my_peer)) + if (NULL != session->set_listener) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n"); - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; + GNUNET_SET_listen_cancel (session->set_listener); + session->set_listener = NULL; } - statistics = GNUNET_STATISTICS_create ("consensus", cfg); - GNUNET_SERVER_add_handlers (server, server_handlers); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); - GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); + GNUNET_CONTAINER_DLL_remove (sessions_head, + sessions_tail, + session); + GNUNET_free (session); } /** - * The main function for the consensus service. - * - * @param argc number of arguments from the command line - * @param argv command line arguments - * @return 0 ok, 1 on error + * Define "main" method using service macro. */ -int -main (int argc, char *const *argv) -{ - int ret; - ret = GNUNET_SERVICE_run (argc, argv, "consensus", GNUNET_SERVICE_OPTION_NONE, &run, NULL); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret); - return (GNUNET_OK == ret) ? 0 : 1; -} +GNUNET_SERVICE_MAIN +("consensus", + GNUNET_SERVICE_OPTION_NONE, + &run, + &client_connect_cb, + &client_disconnect_cb, + NULL, + GNUNET_MQ_hd_fixed_size (client_conclude, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, + struct GNUNET_MessageHeader, + NULL), + GNUNET_MQ_hd_var_size (client_insert, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, + struct GNUNET_CONSENSUS_ElementMessage, + NULL), + GNUNET_MQ_hd_var_size (client_join, + GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, + struct GNUNET_CONSENSUS_JoinMessage, + NULL), + GNUNET_MQ_handler_end ()); + +/* end of gnunet-service-consensus.c */ -- 2.25.1