From 0ad34d9fbb44b13f63056c3ec8020dfeb49582fc Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Mon, 2 Jul 2018 10:43:58 +0100 Subject: [PATCH] Implement adding dependencies via control protocol. --- src/control.cc | 117 ++++++++++++++++++++++++++++++++++++ src/includes/control-cmds.h | 4 +- src/includes/control.h | 2 + src/includes/service.h | 31 +++++++++- 4 files changed, 151 insertions(+), 3 deletions(-) diff --git a/src/control.cc b/src/control.cc index 8373daa..39a26f2 100644 --- a/src/control.cc +++ b/src/control.cc @@ -1,3 +1,6 @@ +#include +#include + #include "control.h" #include "service.h" @@ -66,6 +69,9 @@ bool control_conn_t::process_packet() if (pktType == DINIT_CP_LISTSERVICES) { return list_services(); } + if (pktType == DINIT_CP_ADD_DEP) { + return add_service_dep(); + } else { // Unrecognized: give error response char outbuf[] = { DINIT_RP_BADREQ }; @@ -370,6 +376,117 @@ bool control_conn_t::list_services() } } +// check for value in a set +static inline bool contains(const int (&v)[3], int i) +{ + return std::find(std::begin(v), std::end(v), i) != std::end(v); +} + +bool control_conn_t::add_service_dep() +{ + // 1 byte packet type + // 1 byte dependency type + // handle: "from" + // handle: "to" + + constexpr int pkt_size = 2 + sizeof(handle_t) * 2; + + if (rbuf.get_length() < pkt_size) { + chklen = pkt_size; + return true; + } + + handle_t from_handle; + handle_t to_handle; + rbuf.extract((char *) &from_handle, 2, sizeof(from_handle)); + rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle)); + + service_record *from_service = find_service_for_key(from_handle); + service_record *to_service = find_service_for_key(to_handle); + if (from_service == nullptr || to_service == nullptr || from_service == to_service) { + // Service handle is bad + char badreq_rep[] = { DINIT_RP_BADREQ }; + if (! queue_packet(badreq_rep, 1)) return false; + bad_conn_close = true; + iob.set_watches(OUT_EVENTS); + return true; + } + + // Check dependency type is valid: + int dep_type_int = rbuf[1]; + if (! contains({(int)dependency_type::MILESTONE, (int)dependency_type::REGULAR, + (int)dependency_type::WAITS_FOR}, dep_type_int)) { + char badreqRep[] = { DINIT_RP_BADREQ }; + if (! queue_packet(badreqRep, 1)) return false; + bad_conn_close = true; + iob.set_watches(OUT_EVENTS); + } + dependency_type dep_type = static_cast(dep_type_int); + + // Check current service states are valid for given dep type + if (dep_type == dependency_type::REGULAR) { + if (from_service->get_state() != service_state_t::STOPPED && + to_service->get_state() != service_state_t::STARTED) { + // Cannot create dependency now since it would be contradicted: + char nak_rep[] = { DINIT_RP_NAK }; + if (! queue_packet(nak_rep, 1)) return false; + rbuf.consume(pkt_size); + chklen = 0; + return true; + } + } + + // Check for creation of circular dependency chain + std::unordered_set dep_marks; + std::vector dep_queue; + dep_queue.push_back(to_service); + while (! dep_queue.empty()) { + service_record * sr = dep_queue.back(); + dep_queue.pop_back(); + // iterate deps; if dep == from, abort; otherwise add to set/queue + // (only add to queue if not already in set) + for (auto &dep : sr->get_dependencies()) { + service_record * dep_to = dep.get_to(); + if (dep_to == from_service) { + // fail, circular dependency! + char nak_rep[] = { DINIT_RP_NAK }; + if (! queue_packet(nak_rep, 1)) return false; + rbuf.consume(pkt_size); + chklen = 0; + return true; + } + if (dep_marks.insert(dep_to).second) { + dep_queue.push_back(dep_to); + } + } + } + dep_marks.clear(); + dep_queue.clear(); + + // Prevent creation of duplicate dependency: + for (auto &dep : from_service->get_dependencies()) { + service_record * dep_to = dep.get_to(); + if (dep_to == to_service && dep.dep_type == dep_type) { + // Dependency already exists: return success + char ack_rep[] = { DINIT_RP_ACK }; + if (! queue_packet(ack_rep, 1)) return false; + rbuf.consume(pkt_size); + chklen = 0; + return true; + } + } + + // Create dependency: + from_service->add_dep(to_service, dep_type); + services->process_queues(); + + char ack_rep[] = { DINIT_RP_ACK }; + if (! queue_packet(ack_rep, 1)) return false; + rbuf.consume(pkt_size); + chklen = 0; + return true; +} + control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record) { // Try to find a unique handle (integer) in a single pass. Since the map is ordered, we can search until diff --git a/src/includes/control-cmds.h b/src/includes/control-cmds.h index 088e0d5..800a6cb 100644 --- a/src/includes/control-cmds.h +++ b/src/includes/control-cmds.h @@ -29,7 +29,9 @@ constexpr static int DINIT_CP_UNLOADSERVICE = 9; constexpr static int DINIT_CP_SHUTDOWN = 10; // followed by 1-byte shutdown type - +// Add/remove dependency to existing service: +constexpr static int DINIT_CP_ADD_DEP = 11; +constexpr static int DINIT_CP_REM_DEP = 12; // Replies: diff --git a/src/includes/control.h b/src/includes/control.h index ee9d9f8..e1cfd5c 100644 --- a/src/includes/control.h +++ b/src/includes/control.h @@ -146,6 +146,8 @@ class control_conn_t : private service_listener bool list_services(); + bool add_service_dep(); + // Notify that data is ready to be read from the socket. Returns true if the connection should // be closed. bool data_ready() noexcept; diff --git a/src/includes/service.h b/src/includes/service.h index bc3becc..0c4b0cc 100644 --- a/src/includes/service.h +++ b/src/includes/service.h @@ -202,12 +202,12 @@ class service_dep : from(from), to(to), waiting_on(false), holding_acq(false), dep_type(dep_type_p) { } - service_record * get_from() noexcept + service_record * get_from() const noexcept { return from; } - service_record * get_to() noexcept + service_record * get_to() const noexcept { return to; } @@ -646,6 +646,33 @@ class service_record { return 0; } + + const dep_list & get_dependencies() + { + return depends_on; + } + + // Add a dependency. Caller must ensure that the services are in an appropriate state and that + // a circular dependency chain is not created. Propagation queues should be processed after + // calling this. May throw std::bad_alloc. + void add_dep(service_record *to, dependency_type dep_type) + { + depends_on.emplace_back(this, to, dep_type); + try { + to->dependents.push_back(& depends_on.back()); + } + catch (...) { + depends_on.pop_back(); + throw; + } + + if (dep_type == dependency_type::REGULAR) { + if (service_state == service_state_t::STARTING || service_state == service_state_t::STARTED) { + to->require(); + depends_on.back().holding_acq = true; + } + } + } }; inline auto extract_prop_queue(service_record *sr) -> decltype(sr->prop_queue_node) & -- 2.25.1