2 #include <unordered_set>
8 // Server-side control protocol implementation. This implements the functionality that allows
9 // clients (such as dinitctl) to query service state and issue commands to control services.
12 constexpr auto OUT_EVENTS = dasynq::OUT_EVENTS;
13 constexpr auto IN_EVENTS = dasynq::IN_EVENTS;
15 // Control protocol minimum compatible version and current version:
16 constexpr uint16_t min_compat_version = 1;
17 constexpr uint16_t cp_version = 1;
19 // check for value in a set
20 template <typename T, int N, typename U>
21 inline bool contains(const T (&v)[N], U i)
23 return std::find_if(std::begin(v), std::end(v),
24 [=](T p){ return i == static_cast<U>(p); }) != std::end(v);
28 bool control_conn_t::process_packet()
32 // Note that where we call queue_packet, we must generally check the return value. If it
33 // returns false it has either deleted the connection or marked it for deletion; we
34 // shouldn't touch instance members after that point.
36 int pktType = rbuf[0];
37 if (pktType == DINIT_CP_QUERYVERSION) {
39 // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) actual version
40 char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
41 memcpy(replyBuf + 1, &min_compat_version, 2);
42 memcpy(replyBuf + 3, &cp_version, 2);
43 if (! queue_packet(replyBuf, sizeof(replyBuf))) return false;
47 if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
48 return process_find_load(pktType);
50 if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
51 || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
52 return process_start_stop(pktType);
54 if (pktType == DINIT_CP_UNPINSERVICE) {
55 return process_unpin_service();
57 if (pktType == DINIT_CP_UNLOADSERVICE) {
58 return process_unload_service();
60 if (pktType == DINIT_CP_SHUTDOWN) {
62 if (rbuf.get_length() < 2) {
67 if (contains({shutdown_type_t::CONTINUE, shutdown_type_t::HALT,
68 shutdown_type_t::POWEROFF, shutdown_type_t::REBOOT}, rbuf[1])) {
69 auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
71 services->stop_all_services(sd_type);
72 char ackBuf[] = { DINIT_RP_ACK };
73 if (! queue_packet(ackBuf, 1)) return false;
75 // Clear the packet from the buffer
81 // (otherwise fall through to below).
83 if (pktType == DINIT_CP_LISTSERVICES) {
84 return list_services();
86 if (pktType == DINIT_CP_ADD_DEP) {
87 return add_service_dep();
89 if (pktType == DINIT_CP_REM_DEP) {
90 return rm_service_dep();
92 if (pktType == DINIT_CP_QUERY_LOAD_MECH) {
93 return query_load_mech();
95 if (pktType == DINIT_CP_ENABLESERVICE) {
96 return add_service_dep(true);
99 // Unrecognized: give error response
100 char outbuf[] = { DINIT_RP_BADREQ };
101 if (! queue_packet(outbuf, 1)) return false;
102 bad_conn_close = true;
103 iob.set_watches(OUT_EVENTS);
107 bool control_conn_t::process_find_load(int pktType)
111 constexpr int pkt_size = 4;
113 if (rbuf.get_length() < pkt_size) {
119 rbuf.extract((char *)&svcSize, 1, 2);
120 chklen = svcSize + 3; // packet type + (2 byte) length + service name
121 if (svcSize <= 0 || chklen > 1024) {
122 // Queue error response / mark connection bad
123 char badreqRep[] = { DINIT_RP_BADREQ };
124 if (! queue_packet(badreqRep, 1)) return false;
125 bad_conn_close = true;
126 iob.set_watches(OUT_EVENTS);
130 if (rbuf.get_length() < chklen) {
131 // packet not complete yet; read more
135 service_record * record = nullptr;
137 string serviceName = rbuf.extract_string(3, svcSize);
139 if (pktType == DINIT_CP_LOADSERVICE) {
142 record = services->load_service(serviceName.c_str());
144 catch (service_load_exc &slexc) {
145 log(loglevel_t::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
150 record = services->find_service(serviceName.c_str());
153 if (record != nullptr) {
154 // Allocate a service handle
155 handle_t handle = allocate_service_handle(record);
156 std::vector<char> rp_buf;
158 rp_buf.push_back(DINIT_RP_SERVICERECORD);
159 rp_buf.push_back(static_cast<char>(record->get_state()));
160 for (int i = 0; i < (int) sizeof(handle); i++) {
161 rp_buf.push_back(*(((char *) &handle) + i));
163 rp_buf.push_back(static_cast<char>(record->get_target_state()));
164 if (! queue_packet(std::move(rp_buf))) return false;
167 std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
168 if (! queue_packet(std::move(rp_buf))) return false;
171 // Clear the packet from the buffer
172 rbuf.consume(chklen);
177 bool control_conn_t::process_start_stop(int pktType)
181 constexpr int pkt_size = 2 + sizeof(handle_t);
183 if (rbuf.get_length() < pkt_size) {
188 // 1 byte: packet type
189 // 1 byte: pin in requested state (0 = no pin, 1 = pin)
190 // 4 bytes: service handle
192 bool do_pin = (rbuf[1] == 1);
194 rbuf.extract((char *) &handle, 2, sizeof(handle));
196 service_record *service = find_service_for_key(handle);
197 if (service == nullptr) {
198 // Service handle is bad
199 char badreqRep[] = { DINIT_RP_BADREQ };
200 if (! queue_packet(badreqRep, 1)) return false;
201 bad_conn_close = true;
202 iob.set_watches(OUT_EVENTS);
206 bool already_there = false;
209 case DINIT_CP_STARTSERVICE:
210 // start service, mark as required
211 if (do_pin) service->pin_start();
213 services->process_queues();
214 already_there = service->get_state() == service_state_t::STARTED;
216 case DINIT_CP_STOPSERVICE:
217 // force service to stop
218 if (do_pin) service->pin_stop();
220 service->forced_stop();
221 services->process_queues();
222 already_there = service->get_state() == service_state_t::STOPPED;
224 case DINIT_CP_WAKESERVICE:
225 // re-start a stopped service (do not mark as required)
226 if (do_pin) service->pin_start();
227 service->start(false);
228 services->process_queues();
229 already_there = service->get_state() == service_state_t::STARTED;
231 case DINIT_CP_RELEASESERVICE:
232 // remove required mark, stop if not required by dependents
233 if (do_pin) service->pin_stop();
234 service->stop(false);
235 services->process_queues();
236 already_there = service->get_state() == service_state_t::STOPPED;
240 char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
242 if (! queue_packet(ack_buf, 1)) return false;
245 // Clear the packet from the buffer
246 rbuf.consume(pkt_size);
251 bool control_conn_t::process_unpin_service()
255 constexpr int pkt_size = 1 + sizeof(handle_t);
257 if (rbuf.get_length() < pkt_size) {
262 // 1 byte: packet type
263 // 4 bytes: service handle
266 rbuf.extract((char *) &handle, 1, sizeof(handle));
268 service_record *service = find_service_for_key(handle);
269 if (service == nullptr) {
270 // Service handle is bad
271 char badreqRep[] = { DINIT_RP_BADREQ };
272 if (! queue_packet(badreqRep, 1)) return false;
273 bad_conn_close = true;
274 iob.set_watches(OUT_EVENTS);
279 services->process_queues();
280 char ack_buf[] = { (char) DINIT_RP_ACK };
281 if (! queue_packet(ack_buf, 1)) return false;
283 // Clear the packet from the buffer
284 rbuf.consume(pkt_size);
289 bool control_conn_t::process_unload_service()
293 constexpr int pkt_size = 1 + sizeof(handle_t);
295 if (rbuf.get_length() < pkt_size) {
300 // 1 byte: packet type
301 // 4 bytes: service handle
304 rbuf.extract((char *) &handle, 1, sizeof(handle));
306 service_record *service = find_service_for_key(handle);
307 if (service == nullptr) {
308 // Service handle is bad
309 char badreq_rep[] = { DINIT_RP_BADREQ };
310 if (! queue_packet(badreq_rep, 1)) return false;
311 bad_conn_close = true;
312 iob.set_watches(OUT_EVENTS);
316 if (! service->has_lone_ref() || service->get_state() != service_state_t::STOPPED) {
317 // Cannot unload: has other references
318 char nak_rep[] = { DINIT_RP_NAK };
319 if (! queue_packet(nak_rep, 1)) return false;
323 service->prepare_for_unload();
324 services->remove_service(service);
328 service_key_map.erase(service);
329 key_service_map.erase(handle);
332 char ack_buf[] = { (char) DINIT_RP_ACK };
333 if (! queue_packet(ack_buf, 1)) return false;
336 // Clear the packet from the buffer
337 rbuf.consume(pkt_size);
342 bool control_conn_t::list_services()
344 rbuf.consume(1); // clear request packet
348 auto slist = services->list_services();
349 for (auto sptr : slist) {
350 std::vector<char> pkt_buf;
352 int hdrsize = 8 + std::max(sizeof(int), sizeof(pid_t));
354 const std::string &name = sptr->get_name();
355 int nameLen = std::min((size_t)256, name.length());
356 pkt_buf.resize(hdrsize + nameLen);
358 pkt_buf[0] = DINIT_RP_SVCINFO;
359 pkt_buf[1] = nameLen;
360 pkt_buf[2] = static_cast<char>(sptr->get_state());
361 pkt_buf[3] = static_cast<char>(sptr->get_target_state());
363 char b0 = sptr->is_waiting_for_console() ? 1 : 0;
364 b0 |= sptr->has_console() ? 2 : 0;
365 b0 |= sptr->was_start_skipped() ? 4 : 0;
367 pkt_buf[5] = static_cast<char>(sptr->get_stop_reason());
369 pkt_buf[6] = 0; // reserved
372 // Next: either the exit status, or the process ID
373 if (sptr->get_state() != service_state_t::STOPPED) {
374 pid_t proc_pid = sptr->get_pid();
375 memcpy(pkt_buf.data() + 8, &proc_pid, sizeof(proc_pid));
378 int exit_status = sptr->get_exit_status();
379 memcpy(pkt_buf.data() + 8, &exit_status, sizeof(exit_status));
382 for (int i = 0; i < nameLen; i++) {
383 pkt_buf[hdrsize+i] = name[i];
386 if (! queue_packet(std::move(pkt_buf))) return false;
389 char ack_buf[] = { (char) DINIT_RP_LISTDONE };
390 if (! queue_packet(ack_buf, 1)) return false;
394 catch (std::bad_alloc &exc)
401 bool control_conn_t::add_service_dep(bool do_enable)
403 // 1 byte packet type
404 // 1 byte dependency type
408 constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
410 if (rbuf.get_length() < pkt_size) {
415 handle_t from_handle;
417 rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
418 rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
420 service_record *from_service = find_service_for_key(from_handle);
421 service_record *to_service = find_service_for_key(to_handle);
422 if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
423 // Service handle is bad
424 char badreq_rep[] = { DINIT_RP_BADREQ };
425 if (! queue_packet(badreq_rep, 1)) return false;
426 bad_conn_close = true;
427 iob.set_watches(OUT_EVENTS);
431 // Check dependency type is valid:
432 int dep_type_int = rbuf[1];
433 if (! contains({dependency_type::MILESTONE, dependency_type::REGULAR,
434 dependency_type::WAITS_FOR}, dep_type_int)) {
435 char badreqRep[] = { DINIT_RP_BADREQ };
436 if (! queue_packet(badreqRep, 1)) return false;
437 bad_conn_close = true;
438 iob.set_watches(OUT_EVENTS);
440 dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
442 // Check current service states are valid for given dep type
443 if (dep_type == dependency_type::REGULAR) {
444 if (from_service->get_state() != service_state_t::STOPPED &&
445 to_service->get_state() != service_state_t::STARTED) {
446 // Cannot create dependency now since it would be contradicted:
447 char nak_rep[] = { DINIT_RP_NAK };
448 if (! queue_packet(nak_rep, 1)) return false;
449 rbuf.consume(pkt_size);
455 // Check for creation of circular dependency chain
456 std::unordered_set<service_record *> dep_marks;
457 std::vector<service_record *> dep_queue;
458 dep_queue.push_back(to_service);
459 while (! dep_queue.empty()) {
460 service_record * sr = dep_queue.back();
461 dep_queue.pop_back();
462 // iterate deps; if dep == from, abort; otherwise add to set/queue
463 // (only add to queue if not already in set)
464 for (auto &dep : sr->get_dependencies()) {
465 service_record * dep_to = dep.get_to();
466 if (dep_to == from_service) {
467 // fail, circular dependency!
468 char nak_rep[] = { DINIT_RP_NAK };
469 if (! queue_packet(nak_rep, 1)) return false;
470 rbuf.consume(pkt_size);
474 if (dep_marks.insert(dep_to).second) {
475 dep_queue.push_back(dep_to);
482 bool dep_exists = false;
483 service_dep * dep_record = nullptr;
485 // Prevent creation of duplicate dependency:
486 for (auto &dep : from_service->get_dependencies()) {
487 service_record * dep_to = dep.get_to();
488 if (dep_to == to_service && dep.dep_type == dep_type) {
489 // Dependency already exists
497 // Create dependency:
498 dep_record = &(from_service->add_dep(to_service, dep_type));
499 services->process_queues();
502 if (do_enable && contains({service_state_t::STARTED, service_state_t::STARTING},
503 from_service->get_state())) {
504 // The dependency record is activated: mark it as holding acquisition of the dependency, and start
506 dep_record->get_from()->start_dep(*dep_record);
507 services->process_queues();
510 char ack_rep[] = { DINIT_RP_ACK };
511 if (! queue_packet(ack_rep, 1)) return false;
512 rbuf.consume(pkt_size);
517 bool control_conn_t::rm_service_dep()
519 // 1 byte packet type
520 // 1 byte dependency type
524 constexpr int pkt_size = 2 + sizeof(handle_t) * 2;
526 if (rbuf.get_length() < pkt_size) {
531 handle_t from_handle;
533 rbuf.extract((char *) &from_handle, 2, sizeof(from_handle));
534 rbuf.extract((char *) &to_handle, 2 + sizeof(from_handle), sizeof(to_handle));
536 service_record *from_service = find_service_for_key(from_handle);
537 service_record *to_service = find_service_for_key(to_handle);
538 if (from_service == nullptr || to_service == nullptr || from_service == to_service) {
539 // Service handle is bad
540 char badreq_rep[] = { DINIT_RP_BADREQ };
541 if (! queue_packet(badreq_rep, 1)) return false;
542 bad_conn_close = true;
543 iob.set_watches(OUT_EVENTS);
547 // Check dependency type is valid:
548 int dep_type_int = rbuf[1];
549 if (! contains({dependency_type::MILESTONE, dependency_type::REGULAR,
550 dependency_type::WAITS_FOR}, dep_type_int)) {
551 char badreqRep[] = { DINIT_RP_BADREQ };
552 if (! queue_packet(badreqRep, 1)) return false;
553 bad_conn_close = true;
554 iob.set_watches(OUT_EVENTS);
556 dependency_type dep_type = static_cast<dependency_type>(dep_type_int);
558 // Remove dependency:
559 from_service->rm_dep(to_service, dep_type);
560 services->process_queues();
562 char ack_rep[] = { DINIT_RP_ACK };
563 if (! queue_packet(ack_rep, 1)) return false;
564 rbuf.consume(pkt_size);
569 bool control_conn_t::query_load_mech()
574 if (services->get_set_type_id() == SSET_TYPE_DIRLOAD) {
575 dirload_service_set *dss = static_cast<dirload_service_set *>(services);
576 std::vector<char> reppkt;
577 reppkt.resize(2 + sizeof(uint32_t) * 2); // packet type, loader type, packet size, # dirs
578 reppkt[0] = DINIT_RP_LOADER_MECH;
579 reppkt[1] = SSET_TYPE_DIRLOAD;
581 // Number of directories in load path:
582 uint32_t sdirs = dss->get_service_dir_count();
583 std::memcpy(reppkt.data() + 2 + sizeof(uint32_t), &sdirs, sizeof(sdirs));
585 // Our current working directory, which above are relative to:
586 // leave sizeof(uint32_t) for size, which we'll fill in afterwards:
587 std::size_t curpos = reppkt.size() + sizeof(uint32_t);
589 uint32_t try_path_size = PATH_MAX;
591 uint32_t try_path_size = 2048;
595 std::size_t total_size = curpos + std::size_t(try_path_size);
596 reppkt.resize(total_size);
597 if (total_size < curpos) {
599 char ack_rep[] = { DINIT_RP_NAK };
600 if (! queue_packet(ack_rep, 1)) return false;
603 wd = getcwd(reppkt.data() + curpos, try_path_size);
604 if (wd != nullptr) break;
606 try_path_size *= uint32_t(2u);
607 if (try_path_size == 0) {
609 char ack_rep[] = { DINIT_RP_NAK };
610 if (! queue_packet(ack_rep, 1)) return false;
615 uint32_t wd_len = std::strlen(reppkt.data() + curpos);
616 reppkt.resize(curpos + std::size_t(wd_len));
617 std::memcpy(reppkt.data() + curpos - sizeof(uint32_t), &wd_len, sizeof(wd_len));
619 // Each directory in the load path:
620 for (int i = 0; uint32_t(i) < sdirs; i++) {
621 const char *sdir = dss->get_service_dir(i);
622 uint32_t dlen = std::strlen(sdir);
623 auto cursize = reppkt.size();
624 reppkt.resize(cursize + sizeof(dlen) + dlen);
625 std::memcpy(reppkt.data() + cursize, &dlen, sizeof(dlen));
626 std::memcpy(reppkt.data() + cursize + sizeof(dlen), sdir, dlen);
629 // Total packet size:
630 uint32_t fsize = reppkt.size();
631 std::memcpy(reppkt.data() + 2, &fsize, sizeof(fsize));
633 if (! queue_packet(std::move(reppkt))) return false;
637 // If we don't know how to deal with the service set type, send a NAK reply:
638 char ack_rep[] = { DINIT_RP_NAK };
639 if (! queue_packet(ack_rep, 1)) return false;
644 control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record)
646 // Try to find a unique handle (integer) in a single pass. Since the map is ordered, we can search until
647 // we find a gap in the handle values.
648 handle_t candidate = 0;
649 for (auto p : key_service_map) {
650 if (p.first == candidate) candidate++;
654 bool is_unique = (service_key_map.find(record) == service_key_map.end());
656 // The following operations perform allocation (can throw std::bad_alloc). If an exception occurs we
657 // must undo any previous actions:
659 record->add_listener(this);
663 key_service_map[candidate] = record;
664 service_key_map.insert(std::make_pair(record, candidate));
668 record->remove_listener(this);
671 key_service_map.erase(candidate);
677 bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
679 int in_flag = bad_conn_close ? 0 : IN_EVENTS;
680 bool was_empty = outbuf.empty();
682 // If the queue is empty, we can try to write the packet out now rather than queueing it.
683 // If the write is unsuccessful or partial, we queue the remainder.
685 int wr = bp_sys::write(iob.get_watched_fd(), pkt, size);
687 if (errno == EPIPE) {
690 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
691 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
694 // EAGAIN etc: fall through to below
697 if ((unsigned)wr == size) {
699 iob.set_watches(in_flag);
707 // Create a vector out of the (remaining part of the) packet:
709 outbuf.emplace_back(pkt, pkt + size);
710 iob.set_watches(in_flag | OUT_EVENTS);
713 catch (std::bad_alloc &baexc) {
714 // Mark the connection bad, and stop reading further requests
715 bad_conn_close = true;
718 // We can't send out-of-memory response as we already wrote as much as we
719 // could above. Neither can we later send the response since we have currently
720 // sent an incomplete packet. All we can do is close the connection.
724 iob.set_watches(OUT_EVENTS);
730 // This queue_packet method is frustratingly similar to the one above, but the subtle differences
731 // make them extraordinary difficult to combine into a single method.
732 bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
734 int in_flag = bad_conn_close ? 0 : IN_EVENTS;
735 bool was_empty = outbuf.empty();
739 // We can try sending the packet immediately:
740 int wr = bp_sys::write(iob.get_watched_fd(), pkt.data(), pkt.size());
742 if (errno == EPIPE) {
745 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
746 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
749 // EAGAIN etc: fall through to below
752 if ((unsigned)wr == pkt.size()) {
754 iob.set_watches(in_flag);
762 outbuf.emplace_back(pkt);
763 iob.set_watches(in_flag | OUT_EVENTS);
766 catch (std::bad_alloc &baexc) {
767 // Mark the connection bad, and stop reading further requests
768 bad_conn_close = true;
771 // We can't send out-of-memory response as we already wrote as much as we
772 // could above. Neither can we later send the response since we have currently
773 // sent an incomplete packet. All we can do is close the connection.
777 iob.set_watches(OUT_EVENTS);
783 bool control_conn_t::rollback_complete() noexcept
785 char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
786 return queue_packet(ackBuf, 2);
789 bool control_conn_t::data_ready() noexcept
791 int fd = iob.get_watched_fd();
793 int r = rbuf.fill(fd);
795 // Note file descriptor is non-blocking
797 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
798 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
809 if (rbuf.get_length() >= chklen) {
811 return !process_packet();
813 catch (std::bad_alloc &baexc) {
818 else if (rbuf.get_length() == 1024) {
820 log(loglevel_t::WARN, "Received too-large control package; dropping connection");
821 bad_conn_close = true;
822 iob.set_watches(OUT_EVENTS);
825 int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
826 iob.set_watches(IN_EVENTS | out_flags);
832 bool control_conn_t::send_data() noexcept
834 if (outbuf.empty() && bad_conn_close) {
837 char oomBuf[] = { DINIT_RP_OOM };
838 bp_sys::write(iob.get_watched_fd(), oomBuf, 1);
843 vector<char> & pkt = outbuf.front();
844 char *data = pkt.data();
845 int written = bp_sys::write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
847 if (errno == EPIPE) {
851 else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
852 // spurious readiness notification?
855 log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
861 outpkt_index += written;
862 if (outpkt_index == pkt.size()) {
863 // We've finished this packet, move on to the next:
866 if (outbuf.empty() && ! oom_close) {
867 if (! bad_conn_close) {
868 iob.set_watches(IN_EVENTS);
879 control_conn_t::~control_conn_t() noexcept
881 bp_sys::close(iob.get_watched_fd());
882 iob.deregister(loop);
884 // Clear service listeners
885 for (auto p : service_key_map) {
886 p.first->remove_listener(this);
889 active_control_conns--;