From: Davin McCall Date: Fri, 6 Apr 2018 18:25:04 +0000 (+0100) Subject: shutdown: push all output through a cyclic buffer. X-Git-Tag: v0.2.0~26 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=ec7899acefb40a7438114224abf7de8982829ae5;p=oweals%2Fdinit.git shutdown: push all output through a cyclic buffer. Push all output, including from subprocesses, through a cyclic buffer, and discard buffer contents if it cannot be output fast enough. I.e. shutdown should not hang if the console is blocked (such as due to a hardware issue with serial console, or user having pressed ctrl+S to stop console output on a VT). --- diff --git a/TODO b/TODO index 214d38c..653604f 100644 --- a/TODO +++ b/TODO @@ -1,9 +1,5 @@ For version 0.2.0: ------------------ -* shutdown command presently hangs if terminal output blocked (scroll lock - via ^S). Should use a buffer as dinit does, and pipe output from subcommands - via the buffer too - if the buffer becomes full, discard subcommand output - and continue to consume it. * Load services from several different directories, with an order of precedence, to allow for packaged service descriptions and user-modified service descriptions. diff --git a/src/includes/cpbuffer.h b/src/includes/cpbuffer.h index 9019f4a..a47a9c4 100644 --- a/src/includes/cpbuffer.h +++ b/src/includes/cpbuffer.h @@ -47,7 +47,7 @@ template class cpbuffer } } - // fill by reading from the given fd, return positive if some was read or -1 on error. + // Fill by reading from the given fd, return positive if some was read or -1 on error. int fill(int fd) noexcept { int pos = cur_idx + length; @@ -60,6 +60,21 @@ template class cpbuffer return r; } + // Fill by reading up to the specified amount of bytes from the given fd, + // Return is the number of bytes read, 0 on end-of-file or -1 on error. + int fill(int fd, int limit) noexcept + { + int pos = cur_idx + length; + if (pos >= SIZE) pos -= SIZE; + int max_count = std::min(SIZE - pos, SIZE - length); + max_count = std::min(max_count, limit); + ssize_t r = read(fd, buf + pos, max_count); + if (r >= 0) { + length += r; + } + return r; + } + // fill by reading from the given fd, until at least the specified number of bytes are in // the buffer. Return 0 if end-of-file reached before fill complete, or -1 on error. int fill_to(int fd, int rlength) noexcept diff --git a/src/shutdown.cc b/src/shutdown.cc index 6ba4416..511553f 100644 --- a/src/shutdown.cc +++ b/src/shutdown.cc @@ -19,12 +19,219 @@ #include "service-constants.h" #include "dinit-client.h" +#include "dasynq.h" + // shutdown: shut down the system // This utility communicates with the dinit daemon via a unix socket (/dev/initctl). +using loop_t = dasynq::event_loop_n; +using rearm = dasynq::rearm; +using clock_type = dasynq::clock_type; +class subproc_buffer; + void do_system_shutdown(shutdown_type_t shutdown_type); -static void unmount_disks(); -static void swap_off(); +static void unmount_disks(loop_t &loop, subproc_buffer &sub_buf); +static void swap_off(loop_t &loop, subproc_buffer &sub_buf); + +constexpr static int subproc_bufsize = 4096; + +constexpr static char output_lost_msg[] = "[Some output has not been shown due to buffer overflow]\n"; + +// A buffer which maintains a series of overflow markers, used for capturing and echoing +// subprocess output. +class subproc_buffer : private cpbuffer +{ + using base = cpbuffer; + + int overflow_marker = -1; + int last_overflow = -1; // last marker in the series + const char *overflow_msg_ptr = nullptr; // current position in overflow message + dasynq::event_loop_n &loop; + dasynq::event_loop_n::fd_watcher * out_watch; + + public: + enum class fill_status + { + OK, + ENDFILE, + FULL + }; + + subproc_buffer(dasynq::event_loop_n &loop_p, int out_fd) : loop(loop_p) + { + using loop_t = dasynq::event_loop_n; + using rearm = dasynq::rearm; + + out_watch = loop_t::fd_watcher::add_watch(loop, out_fd, dasynq::OUT_EVENTS, + [&](loop_t &eloop, int fd, int flags) -> rearm { + + auto fstatus = flush(STDOUT_FILENO); + if (fstatus == subproc_buffer::fill_status::ENDFILE) { + return rearm::DISARM; + } + + return rearm::REARM; + }); + } + + ~subproc_buffer() + { + out_watch->deregister(loop); + } + + // Fill buffer by reading from file descriptor. Note caller must set overflow marker + // if the buffer becomes full and more data is available. + fill_status fill(int fd) + { + int rem = get_free(); + + if (rem == 0) { + return fill_status::FULL; + } + + int read = base::fill(fd, rem); + if (read <= 0) { + if (read == -1 && errno == EAGAIN) { + return fill_status::OK; + } + return fill_status::ENDFILE; + } + + out_watch->set_enabled(loop, true); + return fill_status::OK; + } + + // Append a message. If the message will not fit in the buffer, discard it and mark overflow. + void append(const char *msg) + { + out_watch->set_enabled(loop, true); + int len = strlen(msg); + if (subproc_bufsize - get_length() >= len) { + base::append(msg, len); + } + else { + mark_overflow(); + } + } + + // Append the given buffer, which must fit in the remaining space in this buffer. + void append(const char *buf, int len) + { + out_watch->set_enabled(loop, true); + base::append(buf, len); + } + + int get_free() + { + return base::get_free(); + } + + // Write buffer contents out to file descriptor. The descriptor is assumed to be non-blocking. + // returns ENDFILE if there is no more content to flush (buffer is now empty) or OK otherwise. + fill_status flush(int fd) + { + int to_write = get_contiguous_length(get_ptr(0)); + + if (overflow_marker != -1) { + if (overflow_marker == 0) { + // output (remainder of) overflow message + int l = std::strlen(overflow_msg_ptr); + int r = write(fd, overflow_msg_ptr, l); + if (r == l) { + // entire message has been written; next marker is in buffer + int16_t overflow_marker16; + extract(reinterpret_cast(&overflow_marker16), 0, sizeof(overflow_marker16)); + overflow_marker = overflow_marker16; + consume(sizeof(overflow_marker16)); + + // no more overflow markers? + if (overflow_marker == -1) { + last_overflow = -1; + } + return get_length() == 0 ? fill_status::ENDFILE : fill_status::OK; + } + if (r > 0) { + overflow_msg_ptr += r; + } + return fill_status::OK; + } + + to_write = std::min(to_write, overflow_marker); + } + + int r = write(fd, get_ptr(0), to_write); + if (r > 0) { + consume(r); + if (overflow_marker != -1) { + overflow_marker -= r; + last_overflow -= r; + if (overflow_marker == 0) { + overflow_msg_ptr = output_lost_msg; + } + } + } + + return get_length() == 0 ? fill_status::ENDFILE : fill_status::OK; + } + + // Mark overflow occurred. Call this only when the buffer is full. + // The marker is put after the most recent newline in the buffer, if possible, so that whole + // lines are retained in the buffer. In some cases marking overflow will not add a new overflow + // marker but simply trim the buffer to an existing marker. + void mark_overflow() + { + // Try to find the last newline in the buffer + int begin = 0; + if (last_overflow != -1) { + begin = last_overflow + sizeof(int16_t); + } + int end = get_length() - 1 - sizeof(int16_t); // -1, then -2 for storage of marker + + int i; + for (i = end; i >= begin; i--) { + if ((*this)[i] == '\n') break; + } + + if (last_overflow != -1 && i < begin) { + // No new line after existing marker: trim all beyond that marker, don't + // create a new marker: + trim_to(last_overflow + sizeof(uint16_t)); + return; + } + + if (i < begin) { + // No newline in the whole buffer... we'll put the overflow marker at the end, + // on the assumption that it is better to output a partial line than it is to + // discard the entire buffer: + last_overflow = get_length() - sizeof(int16_t); + overflow_marker = last_overflow; + int16_t overflow16 = -1; + char * overflow16_ptr = reinterpret_cast(&overflow16); + *get_ptr(last_overflow + 0) = overflow16_ptr[0]; + *get_ptr(last_overflow + 1) = overflow16_ptr[1]; + return; + } + + // We found a newline, put the overflow marker just after it: + int new_overflow = i + 1; + if (last_overflow != -1) { + int16_t new_overflow16 = new_overflow; + char * new_overflow16_ptr = reinterpret_cast(&new_overflow16); + *get_ptr(last_overflow + 0) = new_overflow16_ptr[0]; + *get_ptr(last_overflow + 1) = new_overflow16_ptr[1]; + } + last_overflow = new_overflow; + if (overflow_marker == -1) { + overflow_marker = last_overflow; + } + + int16_t overflow16 = -1; + char * overflow16_ptr = reinterpret_cast(&overflow16); + *get_ptr(last_overflow + 0) = overflow16_ptr[0]; + *get_ptr(last_overflow + 0) = overflow16_ptr[1]; + trim_to(last_overflow + sizeof(int16_t)); + } +}; int main(int argc, char **argv) @@ -190,10 +397,15 @@ void do_system_shutdown(shutdown_type_t shutdown_type) sigfillset(&allsigs); sigprocmask(SIG_SETMASK, &allsigs, nullptr); - int reboot_type = 0; - if (shutdown_type == shutdown_type_t::REBOOT) reboot_type = RB_AUTOBOOT; - else if (shutdown_type == shutdown_type_t::POWEROFF) reboot_type = RB_POWER_OFF; - else reboot_type = RB_HALT_SYSTEM; + int reboot_type = RB_AUTOBOOT; // reboot +#if defined(RB_POWER_OFF) + if (shutdown_type == shutdown_type_t::POWEROFF) reboot_type = RB_POWER_OFF; +#endif +#if defined(RB_HALT_SYSTEM) + if (shutdown_type == shutdown_type_t::HALT) reboot_type = RB_HALT_SYSTEM; +#elif defined(RB_HALT) + if (shutdown_type == shutdown_type_t::HALT) reboot_type = RB_HALT; +#endif // Write to console rather than any terminal, since we lose the terminal it seems: close(STDOUT_FILENO); @@ -202,48 +414,156 @@ void do_system_shutdown(shutdown_type_t shutdown_type) dup2(consfd, STDOUT_FILENO); } - cout << "Sending TERM/KILL to all processes..." << endl; + loop_t loop; + subproc_buffer sub_buf {loop, STDOUT_FILENO}; + + sub_buf.append("Sending TERM/KILL to all processes...\n"); // Send TERM/KILL to all (remaining) processes kill(-1, SIGTERM); - sleep(1); + + // 1 second delay (while outputting from sub_buf): + bool timeout_reached = false; + dasynq::time_val timeout {1, 0}; + dasynq::time_val interval {0,0}; + loop_t::timer::add_timer(loop, clock_type::MONOTONIC, true /* relative */, + timeout.get_timespec(), interval.get_timespec(), + [&](loop_t &eloop, int expiry_count) -> rearm { + + timeout_reached = true; + return rearm::REMOVE; + }); + + do { + loop.run(); + } while (! timeout_reached); + kill(-1, SIGKILL); // perform shutdown - cout << "Turning off swap..." << endl; - swap_off(); - cout << "Unmounting disks..." << endl; - unmount_disks(); + sub_buf.append("Turning off swap...\n"); + swap_off(loop, sub_buf); + sub_buf.append("Unmounting disks...\n"); + unmount_disks(loop, sub_buf); sync(); - cout << "Issuing shutdown via kernel..." << endl; + sub_buf.append("Issuing shutdown via kernel...\n"); + loop.poll(); // give message a chance to get to console reboot(reboot_type); } -static void unmount_disks() +// Watcher for subprocess output. +class subproc_out_watch : public loop_t::fd_watcher_impl { - pid_t chpid = fork(); - if (chpid == 0) { - // umount -a -r - // -a : all filesystems (except proc) - // -r : mount readonly if can't unmount - execl("/bin/umount", "/bin/umount", "-a", "-r", nullptr); + subproc_buffer &sub_buf; + bool in_overflow = false; + + rearm read_overflow(int fd) + { + char buf[128]; + int r = read(fd, buf, 128); + if (r == 0 || (r == -1 && errno != EAGAIN)) { + return rearm::NOOP; // leave disarmed + } + if (r == -1) { + return rearm::REARM; + } + + // How much space is available? + int fr = sub_buf.get_free(); + for (int b = r - std::min(r, fr); b < r; b++) { + if (buf[b] == '\n') { + // Copy the (partial) line into sub_buf and leave overflow mode + sub_buf.append(buf + b, r - b); + in_overflow = false; + } + } + return rearm::REARM; } - else if (chpid > 0) { - int status; - waitpid(chpid, &status, 0); + + public: + subproc_out_watch(subproc_buffer &sub_buf_p) : sub_buf(sub_buf_p) {} + + rearm fd_event(loop_t &, int fd, int flags) + { + // if current status is reading overflow, read and discard until newline + if (in_overflow) { + return read_overflow(fd); + } + + auto r = sub_buf.fill(fd); + if (r == subproc_buffer::fill_status::FULL) { + sub_buf.mark_overflow(); + in_overflow = true; + return read_overflow(fd); + } + else if (r == subproc_buffer::fill_status::ENDFILE) { + return rearm::NOOP; + } + + return rearm::REARM; // re-enable watcher } -} +}; -static void swap_off() +// Run process, put its output through the subprocess buffer +// may throw: std::system_error, std::bad_alloc +static void run_process(const char * prog_args[], loop_t &loop, subproc_buffer &sub_buf) { - pid_t chpid = fork(); - if (chpid == 0) { - // swapoff -a - execl("/sbin/swapoff", "/sbin/swapoff", "-a", nullptr); + class sp_watcher_t : public loop_t::child_proc_watcher_impl + { + public: + bool terminated = false; + + rearm status_change(loop_t &, pid_t child, int status) + { + terminated = true; + return rearm::REMOVE; + } + }; + + sp_watcher_t sp_watcher; + + // Create output pipe + int pipefds[2]; + if (dasynq::pipe2(pipefds, O_NONBLOCK) == -1) { + // TODO + std::cout << "*** pipe2 failed ***" << std::endl; } - else if (chpid > 0) { - int status; - waitpid(chpid, &status, 0); + + pid_t ch_pid = sp_watcher.fork(loop); + if (ch_pid == 0) { + // child + // Dup output pipe to stdout, stderr + dup2(pipefds[1], STDOUT_FILENO); + dup2(pipefds[1], STDERR_FILENO); + close(pipefds[0]); + close(pipefds[1]); + execv(prog_args[0], const_cast(prog_args)); + puts("Failed to execute subprocess:\n"); + perror(prog_args[0]); + _exit(1); } + + close(pipefds[1]); + + subproc_out_watch owatch {sub_buf}; + owatch.add_watch(loop, pipefds[0], dasynq::IN_EVENTS); + + do { + loop.run(); + } while (! sp_watcher.terminated); + + owatch.deregister(loop); +} + +static void unmount_disks(loop_t &loop, subproc_buffer &sub_buf) +{ + const char * unmount_args[] = { "/bin/umount", "-a", "-r", nullptr }; + run_process(unmount_args, loop, sub_buf); +} + +static void swap_off(loop_t &loop, subproc_buffer &sub_buf) +{ + const char * swapoff_args[] = { "/sbin/swapoff", "-a", nullptr }; + run_process(swapoff_args, loop, sub_buf); }