From 0dbabfefab8eeee52b4b5b74a7557e67f547ffa5 Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Sun, 5 Jun 2016 01:49:23 +0100 Subject: [PATCH] Rip out libev, replace with dasynq (new library written for the purpose). --- TODO | 9 +- src/Makefile | 2 +- src/control.cc | 22 +- src/control.h | 61 ++- src/dasync/dasync-aen.h | 440 +++++++++++++++++++++ src/dasync/dasync.h | 858 ++++++++++++++++++++++++++++++++++++++++ src/dasync/dmutex.h | 61 +++ src/dinit-log.cc | 57 ++- src/dinit.cc | 158 ++++++-- src/service.cc | 62 +-- src/service.h | 43 +- 11 files changed, 1653 insertions(+), 120 deletions(-) create mode 100644 src/dasync/dasync-aen.h create mode 100644 src/dasync/dasync.h create mode 100644 src/dasync/dmutex.h diff --git a/TODO b/TODO index 7aeb2d8..07e9236 100644 --- a/TODO +++ b/TODO @@ -1,3 +1,4 @@ +* CPBuffer: cleanup methods a bit (fix API) * When a PROCESS service process dies, and smooth_recovery is false, probably need to force-stop dependents even if the process itself was stopped deliberately. @@ -6,7 +7,8 @@ - support for listing all services * Implement a control utility to start/stop services after dinit has started - very basic version exists, needs thorough cleanup -* Clean up tree a little, move source files into "src" +* We've replaced libev, so that we don't abort on failure. But now exceptions + can be thrown when we register an event watch - need to handle these. For version 1.0: ---------------- @@ -44,7 +46,8 @@ For later: sends terminated process IDs over a pipe to Dinit. Finally, it may be possible to run dinit (and subprocesses) in a new PID namespace (again linux-only). * Allow logging tasks to memory (growing or circular buffer) and later - switching to disk logging (allows for filesystem mounted readonly on boot) + switching to disk logging (allows for filesystem mounted readonly on boot). + But perhaps this really the responsibility of another daemon. * Rate control on process respawn * Allow running services with different resource limits, chroot, cgroups, namespaces (pid/fs/uid), etc @@ -62,8 +65,6 @@ Even later / Maybe never: factor. * Cron-like tasks (if started, they run a sub-task periodically. Stopping the task will wait until the sub-task is complete). -* Socket activation of services? Not sure if enough non-SystemD derived - daemons actually support this to warrant implementing it. * Allow to run services attached to virtual tty, allow connection to that tty (ala "screen"). * SystemD-like handling of filesystem mounts (see autofs documentation in kernel) i.e. a mount point gets an autofs attached, and lazily gets mounted when accessed diff --git a/src/Makefile b/src/Makefile index 25216ea..2ff0a81 100644 --- a/src/Makefile +++ b/src/Makefile @@ -21,7 +21,7 @@ dinit-reboot: dinit-reboot.o $(CXX) -o dinit-reboot dinit-reboot.o $(objects): %.o: %.cc service.h dinit-log.h control.h control-cmds.h cpbuffer.h - $(CXX) $(CXXOPTS) -c $< -o $@ + $(CXX) $(CXXOPTS) -Idasync -c $< -o $@ #install: all diff --git a/src/control.cc b/src/control.cc index 30dd316..937269c 100644 --- a/src/control.cc +++ b/src/control.cc @@ -54,7 +54,7 @@ void ControlConn::processPacket() char outbuf[] = { DINIT_RP_BADREQ }; if (! queuePacket(outbuf, 1)) return; bad_conn_close = true; - ev_io_set(&iob, iob.fd, EV_WRITE); + iob.setWatchFlags(out_events); } return; } @@ -78,7 +78,7 @@ void ControlConn::processFindLoad(int pktType) char badreqRep[] = { DINIT_RP_BADREQ }; if (! queuePacket(badreqRep, 1)) return; bad_conn_close = true; - ev_io_set(&iob, iob.fd, EV_WRITE); + iob.setWatchFlags(out_events); return; } @@ -154,7 +154,7 @@ void ControlConn::processStartStop(int pktType) char badreqRep[] = { DINIT_RP_BADREQ }; if (! queuePacket(badreqRep, 1)) return; bad_conn_close = true; - ev_io_set(&iob, iob.fd, EV_WRITE); + iob.setWatchFlags(out_events); return; } else { @@ -216,7 +216,7 @@ void ControlConn::processUnpinService() char badreqRep[] = { DINIT_RP_BADREQ }; if (! queuePacket(badreqRep, 1)) return; bad_conn_close = true; - ev_io_set(&iob, iob.fd, EV_WRITE); + iob.setWatchFlags(out_events); return; } else { @@ -283,7 +283,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept pkt += wr; size -= wr; } - ev_io_set(&iob, iob.fd, EV_READ | EV_WRITE); + iob.setWatchFlags(in_events | out_events); } // Create a vector out of the (remaining part of the) packet: @@ -302,7 +302,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept delete this; } else { - ev_io_set(&iob, iob.fd, EV_WRITE); + iob.setWatchFlags(out_events); } return false; } @@ -337,7 +337,7 @@ bool ControlConn::queuePacket(std::vector &&pkt) noexcept } outpkt_index = wr; } - ev_io_set(&iob, iob.fd, EV_READ | EV_WRITE); + iob.setWatchFlags(in_events | out_events); } try { @@ -355,7 +355,7 @@ bool ControlConn::queuePacket(std::vector &&pkt) noexcept delete this; } else { - ev_io_set(&iob, iob.fd, EV_WRITE); + iob.setWatchFlags(out_events); } return false; } @@ -403,7 +403,7 @@ bool ControlConn::dataReady() noexcept // TODO log error? // TODO error response? bad_conn_close = true; - ev_io_set(&iob, iob.fd, EV_WRITE); + iob.setWatchFlags(out_events); } return false; @@ -446,7 +446,7 @@ void ControlConn::sendData() noexcept outpkt_index = 0; if (outbuf.empty() && ! oom_close) { if (! bad_conn_close) { - ev_io_set(&iob, iob.fd, EV_READ); + iob.setWatchFlags(in_events); } else { delete this; @@ -458,7 +458,7 @@ void ControlConn::sendData() noexcept ControlConn::~ControlConn() noexcept { close(iob.fd); - ev_io_stop(loop, &iob); + iob.deregisterWatch(loop); // Clear service listeners for (auto p : serviceKeyMap) { diff --git a/src/control.h b/src/control.h index 9f10fed..bf50be5 100644 --- a/src/control.h +++ b/src/control.h @@ -5,9 +5,11 @@ #include #include #include +#include #include -#include + +#include "dasync.h" #include "dinit-log.h" #include "control-cmds.h" @@ -16,13 +18,17 @@ // Control connection for dinit +using namespace dasync; +using EventLoop_t = EventLoop; + // TODO: Use the input buffer as a circular buffer, instead of chomping data from // the front using a data move. -// forward-declaration of callback: -static void control_conn_cb(struct ev_loop * loop, ev_io * w, int revents); - class ControlConn; +class ControlConnWatcher; + +// forward-declaration of callback: +static void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); // Pointer to the control connection that is listening for rollback completion extern ControlConn * rollback_handler_conn; @@ -44,12 +50,32 @@ extern int active_control_conns; class ServiceSet; class ServiceRecord; +class ControlConnWatcher : public PosixFdWatcher +{ + Rearm gotEvent(EventLoop_t * loop, int fd, int flags) override + { + control_conn_cb(loop, this, flags); + return Rearm::REARM; + } + + public: + int fd; // TODO this is already stored, find a better way to access it. + + using PosixFdWatcher::setWatchFlags; + + void registerWith(EventLoop_t *loop, int fd, int flags) + { + this->fd = fd; + PosixFdWatcher::registerWith(loop, fd, flags); + } +}; + class ControlConn : private ServiceListener { - friend void control_conn_cb(struct ev_loop *, ev_io *, int); + friend void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); - struct ev_io iob; - struct ev_loop *loop; + ControlConnWatcher iob; + EventLoop_t *loop; ServiceSet *service_set; bool bad_conn_close = false; // close when finished output? @@ -122,7 +148,8 @@ class ControlConn : private ServiceListener { bad_conn_close = true; oom_close = true; - ev_io_set(&iob, iob.fd, EV_WRITE); + iob.setWatchFlags(out_events); + //ev_io_set(&iob, iob.fd, EV_WRITE); } // Process service event broadcast. @@ -155,11 +182,12 @@ class ControlConn : private ServiceListener } public: - ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0) + ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0) { - ev_io_init(&iob, control_conn_cb, fd, EV_READ); - iob.data = this; - ev_io_start(loop, &iob); + //ev_io_init(&iob, control_conn_cb, fd, EV_READ); + //iob.data = this; + //ev_io_start(loop, &iob); + iob.registerWith(loop, fd, in_events); active_control_conns++; } @@ -170,16 +198,17 @@ class ControlConn : private ServiceListener }; -static void control_conn_cb(struct ev_loop * loop, ev_io * w, int revents) +static void control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents) { - ControlConn *conn = (ControlConn *) w->data; - if (revents & EV_READ) { + char * cc_addr = (reinterpret_cast(watcher)) - offsetof(ControlConn, iob); + ControlConn *conn = reinterpret_cast(cc_addr); + if (revents & in_events) { if (conn->dataReady()) { // ControlConn was deleted return; } } - if (revents & EV_WRITE) { + if (revents & out_events) { conn->sendData(); } } diff --git a/src/dasync/dasync-aen.h b/src/dasync/dasync-aen.h new file mode 100644 index 0000000..8dae802 --- /dev/null +++ b/src/dasync/dasync-aen.h @@ -0,0 +1,440 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace dasync { + +// Event type bits +constexpr unsigned int in_events = 1; +constexpr unsigned int out_events = 2; +constexpr unsigned int err_events = 4; + +constexpr unsigned int one_shot = 8; + + +template class EpollLoop; + +class EpollTraits +{ + template friend class EpollLoop; + + public: + + class SigInfo + { + template friend class EpollLoop; + + struct signalfd_siginfo info; + + public: + int get_signo() { return info.ssi_signo; } + int get_sicode() { return info.ssi_code; } + int get_siint() { return info.ssi_int; } + int get_ssiptr() { return info.ssi_ptr; } + int get_ssiaddr() { return info.ssi_addr; } + + void set_signo(int signo) { info.ssi_signo = signo; } + }; + + class FD_r; + + // File descriptor optional storage. If the mechanism can return the file descriptor, this + // class will be empty, otherwise it can hold a file descriptor. + class FD_s { + friend class FD_r; + + // Epoll doesn't return the file descriptor (it can, but it can't return both file + // descriptor and user data). + int fd; + }; + + // File descriptor reference (passed to event callback). If the mechanism can return the + // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor + // must be stored in an FD_s instance. + class FD_r { + public: + int getFd(FD_s ss) + { + return ss.fd; + } + }; +}; + + +template class EpollLoop : public Base +{ + int epfd; // epoll fd + int sigfd; // signalfd fd; -1 if not initialised + sigset_t sigmask; + + std::unordered_map sigdataMap; + + // Base contains: + // lock - a lock that can be used to protect internal structure. + // receive*() methods will be called with lock held. + // receiveSignal(SigInfo &, user *) noexcept + // receiveFdEvent(FD_r, user *, int flags) noexcept + + using SigInfo = EpollTraits::SigInfo; + using FD_r = typename EpollTraits::FD_r; + + void processEvents(epoll_event *events, int r) + { + std::lock_guard guard(Base::lock); + + for (int i = 0; i < r; i++) { + void * ptr = events[i].data.ptr; + + if (ptr == &sigfd) { + // Signal + SigInfo siginfo; + while (true) { + int r = read(sigfd, &siginfo.info, sizeof(siginfo.info)); + if (r == -1) break; + if (siginfo.get_signo() != SIGCHLD) { + // TODO remove the special exception for SIGCHLD? + sigdelset(&sigmask, siginfo.get_signo()); + } + auto iter = sigdataMap.find(siginfo.get_signo()); + if (iter != sigdataMap.end()) { + void *userdata = (*iter).second; + Base::receiveSignal(siginfo, userdata); + } + } + signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + } + else { + int flags = 0; + (events[i].events & EPOLLIN) && (flags |= in_events); + (events[i].events & EPOLLHUP) && (flags |= in_events); + (events[i].events & EPOLLOUT) && (flags |= out_events); + (events[i].events & EPOLLERR) && (flags |= err_events); + Base::receiveFdEvent(*this, FD_r(), ptr, flags); + } + } + } + + public: + + /** + * EpollLoop constructor. + * + * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. + */ + EpollLoop() : sigfd(-1) + { + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd == -1) { + throw std::system_error(errno, std::system_category()); + } + sigemptyset(&sigmask); + } + + ~EpollLoop() + { + close(epfd); + if (sigfd != -1) { + close(sigfd); + } + } + + // flags: in_events | out_events + void addFdWatch(int fd, void *userdata, int flags) + { + struct epoll_event epevent; + // epevent.data.fd = fd; + epevent.data.ptr = userdata; + epevent.events = 0; + + if (flags & one_shot) { + epevent.events = EPOLLONESHOT; + } + if (flags & in_events) { + epevent.events |= EPOLLIN; + } + if (flags & out_events) { + epevent.events |= EPOLLOUT; + } + + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } + + void removeFdWatch(int fd) + { + epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr); + } + + void removeFdWatch_nolock(int fd) + { + removeFdWatch(fd); + } + + // Note this will *replace* the old flags with the new, that is, + // it can enable *or disable* read/write events. + void enableFdWatch(int fd, void *userdata, int flags) + { + struct epoll_event epevent; + // epevent.data.fd = fd; + epevent.data.ptr = userdata; + epevent.events = 0; + + if (flags & one_shot) { + epevent.events = EPOLLONESHOT; + } + if (flags & in_events) { + epevent.events |= EPOLLIN; + } + if (flags & out_events) { + epevent.events |= EPOLLOUT; + } + + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } + + void enableFdWatch_nolock(int fd, void *userdata, int flags) + { + enableFdWatch(fd, userdata, flags); + } + + void disableFdWatch(int fd) + { + struct epoll_event epevent; + // epevent.data.fd = fd; + epevent.data.ptr = nullptr; + epevent.events = 0; + + // Epoll documentation says that hangup will still be reported, need to check + // whether this is really the case. Suspect it is really only the case if + // EPOLLIN is set. + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } + + // Note signal should be masked before call. + void addSignalWatch(int signo, void *userdata) + { + std::lock_guard guard(Base::lock); + + sigdataMap[signo] = userdata; + + // Modify the signal fd to watch the new signal + bool was_no_sigfd = (sigfd == -1); + sigaddset(&sigmask, signo); + sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + if (sigfd == -1) { + throw new std::system_error(errno, std::system_category()); + } + + if (was_no_sigfd) { + // Add the signalfd to the epoll set. + struct epoll_event epevent; + epevent.data.ptr = &sigfd; + epevent.events = EPOLLIN; + // No need for EPOLLONESHOT - we can pull the signals out + // as we see them. + if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) { + close(sigfd); + throw new std::system_error(errno, std::system_category()); + } + } + } + + // Note, called with lock held: + void rearmSignalWatch_nolock(int signo) noexcept + { + sigaddset(&sigmask, signo); + signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + } + + void removeSignalWatch_nolock(int signo) noexcept + { + sigdelset(&sigmask, signo); + signalfd(sigfd, &sigmask, 0); + } + + void removeSignalWatch(int signo) noexcept + { + std::lock_guard guard(Base::lock); + removeSignalWatch_nolock(signo); + } + + // If events are pending, process an unspecified number of them. + // If no events are pending, wait until one event is received and + // process this event (and possibly any other events received + // simultaneously). + // If processing an event removes a watch, there is a possibility + // that the watched event will still be reported (if it has + // occurred) before pullEvents() returns. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pullEvents(bool do_wait) + { + epoll_event events[16]; + int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0); + if (r == -1 || r == 0) { + // signal or no events + return; + } + + processEvents(events, r); + } + + // If events are pending, process one of them. + // If no events are pending, wait until one event is received and + // process this event. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pullOneEvent(bool do_wait) + { + epoll_event events[1]; + int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0); + if (r == -1 || r == 0) { + // signal or no events + return; + } + + processEvents(events, r); + } + + // Interrupt any current poll operation (pullEvents/pullOneEvent), causing + // it to to return immediately. + void interruptWait() + { + // TODO + } +}; + +// Map of pid_t to void *, with possibility of reserving entries so that mappings can +// be later added with no danger of allocator exhaustion (bad_alloc). +class pid_map +{ + using pair = std::pair; + std::unordered_map base_map; + std::vector backup_vector; + + // Number of entries in backup_vector that are actually in use (as opposed + // to simply reserved): + int backup_size = 0; + + public: + using entry = std::pair; + + entry get(pid_t key) noexcept + { + auto it = base_map.find(key); + if (it == base_map.end()) { + // Not in map; look in vector + for (int i = 0; i < backup_size; i++) { + if (backup_vector[i].first == key) { + return entry(true, backup_vector[i].second); + } + } + + return entry(false, nullptr); + } + + return entry(true, it->second); + } + + entry erase(pid_t key) noexcept + { + auto iter = base_map.find(key); + if (iter != base_map.end()) { + entry r(true, iter->second); + base_map.erase(iter); + return r; + } + for (int i = 0; i < backup_size; i++) { + if (backup_vector[i].first == key) { + entry r(true, backup_vector[i].second); + backup_vector.erase(backup_vector.begin() + i); + return r; + } + } + return entry(false, nullptr); + } + + // Throws bad_alloc on reservation failure + void reserve() + { + backup_vector.resize(backup_vector.size() + 1); + } + + void add(pid_t key, void *val) // throws std::bad_alloc + { + base_map[key] = val; + } + + void add_from_reserve(pid_t key, void *val) noexcept + { + try { + base_map[key] = val; + backup_vector.resize(backup_vector.size() - 1); + } + catch (std::bad_alloc &) { + // We couldn't add into the map, use the reserve: + backup_vector[backup_size++] = pair(key, val); + } + } +}; + +template class ChildProcEvents : public Base +{ + private: + pid_map child_waiters; + + using SigInfo = typename Base::SigInfo; + + protected: + void receiveSignal(SigInfo &siginfo, void *userdata) + { + if (siginfo.get_signo() == SIGCHLD) { + int status; + pid_t child; + while ((child = waitpid(-1, &status, WNOHANG)) > 0) { + pid_map::entry ent = child_waiters.erase(child); + if (ent.first) { + Base::receiveChildStat(child, status, ent.second); + } + } + } + else { + Base::receiveSignal(siginfo, userdata); + } + } + + public: + void reserveChildWatch() + { + child_waiters.reserve(); + } + + void addChildWatch(pid_t child, void *val) + { + child_waiters.add(child, val); + } + + void addReservedChildWatch(pid_t child, void *val) noexcept + { + child_waiters.add_from_reserve(child, val); + } +}; + +} // end namespace diff --git a/src/dasync/dasync.h b/src/dasync/dasync.h new file mode 100644 index 0000000..d9008d7 --- /dev/null +++ b/src/dasync/dasync.h @@ -0,0 +1,858 @@ +#ifndef DASYNC_H_INCLUDED +#define DASYNC_H_INCLUDED + +#include "dasync-aen.h" + +#include +#include +#include +#include + +#include "dmutex.h" + + + +// TODO consider using atomic variables instead of explicit locking where appropriate + +// Allow optimisation of empty classes by including this in the body: +// May be included as the last entry for a class which is only +// _potentially_ empty. + +/* +#ifdef __GNUC__ +#ifdef __clang__ +#define EMPTY_BODY private: char empty_fill[0]; +#else +#define EMPTY_BODY private: char empty_fill[0]; +#endif +#else +#define EMPTY_BODY +#endif +*/ + +namespace dasync { + + +/** + * Values for rearm/disarm return from event handlers + */ +enum class Rearm +{ + /** Re-arm the event watcher so that it receives further events */ + REARM, + /** Disarm the event watcher so that it receives no further events, until it is re-armed explicitly */ + DISARM, + /** Remove the event watcher (and call "removed" callback) */ + REMOVE, + /** Leave in current state */ + NOOP +// TODO: add a REMOVED option, which means, "I removed myself, DON'T TOUCH ME" +}; + + +// Forward declarations: +template class EventLoop; +template class PosixFdWatcher; +template class PosixSignalWatcher; +template class PosixChildWatcher; + +// Information about a received signal. +// This is essentially a wrapper for the POSIX siginfo_t; its existence allows for mechanisms that receive +// equivalent signal information in a different format (eg signalfd on Linux). +using SigInfo = EpollTraits::SigInfo; + +namespace dprivate { + // (non-public API) + + enum class WatchType + { + SIGNAL, + FD, + CHILD, + SECONDARYFD + }; + + template class EventDispatch; + + // For FD watchers: + // Use this watch flag to indicate that in and out events should be reported separately, + // that is, watcher should not be disabled until all watched event types are queued. + constexpr static int multi_watch = 4; + + // Represents a queued event notification + class BaseWatcher + { + template friend class EventDispatch; + template friend class dasync::EventLoop; + + protected: + WatchType watchType; + int active : 1; + int deleteme : 1; + + BaseWatcher * next; + + public: + BaseWatcher(WatchType wt) noexcept : watchType(wt), active(0), deleteme(0), next(nullptr) { } + + virtual ~BaseWatcher() noexcept { } + + // Called when the watcher has been removed. + // It is guaranteed by the caller that: + // - the dispatch method is not currently running + // - the dispatch method will not be called. + virtual void watchRemoved() noexcept + { + // TODO this "delete" behaviour could be dependent on a flag, perhaps? + // delete this; + } + }; + + // Base signal event - not part of public API + template + class BaseSignalWatcher : public BaseWatcher + { + template friend class EventDispatch; + friend class dasync::EventLoop; + + protected: + SigInfo siginfo; + BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { } + + public: + typedef SigInfo &SigInfo_p; + + virtual Rearm gotSignal(EventLoop * eloop, int signo, SigInfo_p siginfo) = 0; + }; + + template + class BaseFdWatcher : public BaseWatcher + { + template friend class EventDispatch; + friend class dasync::EventLoop; + + protected: + int watch_fd; + int watch_flags; + int event_flags; + + BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { } + + public: + virtual Rearm gotEvent(EventLoop * eloop, int fd, int flags) = 0; + }; + + template + class BaseBidiFdWatcher : public BaseFdWatcher + { + template friend class EventDispatch; + friend class dasync::EventLoop; + + // The main instance is the "input" watcher only; we keep a secondary watcher + // with a secondary set of flags for the "output" watcher: + BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD); + + // This should never actually get called: + Rearm gotEvent(EventLoop * eloop, int fd, int flags) + { + return Rearm::REARM; // should not be reachable. + }; + + protected: + int read_removed : 1; // read watch removed? + int write_removed : 1; // write watch removed? + + public: + virtual Rearm readReady(EventLoop * eloop, int fd) = 0; + virtual Rearm writeReady(EventLoop * eloop, int fd) = 0; + }; + + template + class BaseChildWatcher : public BaseWatcher + { + template friend class EventDispatch; + friend class dasync::EventLoop; + + protected: + pid_t watch_pid; + int child_status; + + BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { } + + public: + virtual void gotTermStat(EventLoop * eloop, pid_t child, int status) = 0; + }; + + // Classes for implementing a fair(ish) wait queue. + // A queue node can be signalled when it reaches the head of + // the queue. + + template class waitqueue; + template class waitqueue_node; + + // Select an appropriate conditiona variable type for a mutex: + // condition_variable if mutex is std::mutex, or condition_variable_any + // otherwise. + template class condvarSelector; + + template <> class condvarSelector + { + public: + typedef std::condition_variable condvar; + }; + + template class condvarSelector + { + public: + typedef std::condition_variable_any condvar; + }; + + template <> class waitqueue_node + { + // Specialised waitqueue_node for NullMutex. + // TODO can this be reduced to 0 data members? + friend class waitqueue; + waitqueue_node * next = nullptr; + + public: + void wait(std::unique_lock &ul) { } + void signal() { } + }; + + template class waitqueue_node + { + typename condvarSelector::condvar condvar; + friend class waitqueue; + waitqueue_node * next = nullptr; + + public: + void signal() + { + condvar.notify_one(); + } + + void wait(std::unique_lock &mutex_lock) + { + condvar.wait(mutex_lock); + } + }; + + template class waitqueue + { + waitqueue_node * tail = nullptr; + waitqueue_node * head = nullptr; + + public: + waitqueue_node * unqueue() + { + head = head->next; + return head; + } + + waitqueue_node * getHead() + { + return head; + } + + void queue(waitqueue_node *node) + { + if (tail) { + tail->next = node; + } + else { + head = node; + } + } + }; + + // This class serves as the base class (mixin) for the AEN mechanism class. + // Note that EventDispatch, here, and EventLoop (below) are really two sides of one coin; + // they do not work independently. The mixin pattern that we use to avoid dynamic dispatch + // forces them to be two seperate classes, however. + // + // The EventDispatch class maintains the queued event data structures. It inserts watchers + // into the queue when eventes are received (receiveXXX methods). + template class EventDispatch : public Traits + { + friend class EventLoop; + + // queue data structure/pointer + BaseWatcher * first; + + using BaseSignalWatcher = dasync::dprivate::BaseSignalWatcher; + using BaseFdWatcher = dasync::dprivate::BaseFdWatcher; + using BaseBidiFdWatcher = dasync::dprivate::BaseBidiFdWatcher; + using BaseChildWatcher = dasync::dprivate::BaseChildWatcher; + + void queueWatcher(BaseWatcher *bwatcher) + { + // TODO + // We can't allow a queued entry to be deleted (due to the single-linked-list used for the queue) + // so for now, I'll set it active; but this prevents it being deleted until we can next + // process events, so once we have a proper linked list or better structure should probably + // remove this: + bwatcher->active = true; + + // Put in queue: + BaseWatcher * prev_first = first; + first = bwatcher; + bwatcher->next = prev_first; + } + + protected: + T_Mutex lock; + + void receiveSignal(typename Traits::SigInfo & siginfo, void * userdata) + { + BaseSignalWatcher * bwatcher = static_cast(userdata); + bwatcher->siginfo = siginfo; + queueWatcher(bwatcher); + } + + template + void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags) + { + BaseFdWatcher * bfdw = static_cast(userdata); + + bfdw->event_flags |= flags; + + BaseWatcher * bwatcher = bfdw; + + bool is_multi_watch = bfdw->watch_flags & multi_watch; + if (is_multi_watch) { + BaseBidiFdWatcher *bbdw = static_cast(bwatcher); + if (flags & in_events && flags & out_events) { + // Queue the secondary watcher first: + queueWatcher(&bbdw->outWatcher); + } + else if (flags & out_events) { + // Use the secondary watcher for queueing: + bwatcher = &(bbdw->outWatcher); + } + } + + queueWatcher(bwatcher); + + if (is_multi_watch && bfdw->event_flags != bfdw->watch_flags) { + // We need to re-enable the other channel now: + loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata, + (bfdw->watch_flags & ~(bfdw->event_flags)) | one_shot); + } + } + + void receiveChildStat(pid_t child, int status, void * userdata) + { + BaseChildWatcher * watcher = static_cast(userdata); + watcher->child_status = status; + queueWatcher(watcher); + } + + // TODO is this needed?: + BaseWatcher * pullEvent() + { + if (first) { + BaseWatcher * r = first; + first = first->next; + return r; + } + return nullptr; + } + + void issueDelete(BaseWatcher *watcher) noexcept + { + // This is only called when the attention lock is held, so if the watcher is not + // active/queued now, it cannot become active during execution of this function. + + lock.lock(); + + if (watcher->active) { + // If the watcher is active, set deleteme true; the watcher will be removed + // at the end of current processing (i.e. when active is set false). + watcher->deleteme = true; + } + else { + // Actually do the delete. + watcher->watchRemoved(); + } + + lock.unlock(); + } + }; +} + + +template class EventLoop +{ + friend class PosixFdWatcher; + friend class PosixSignalWatcher; + friend class PosixChildWatcher; + + template using EventDispatch = dprivate::EventDispatch; + template using waitqueue = dprivate::waitqueue; + template using waitqueue_node = dprivate::waitqueue_node; + using BaseWatcher = dprivate::BaseWatcher; + using BaseSignalWatcher = dprivate::BaseSignalWatcher; + using BaseFdWatcher = dprivate::BaseFdWatcher; + using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher; + using BaseChildWatcher = dprivate::BaseChildWatcher; + using WatchType = dprivate::WatchType; + + EpollLoop>> loop_mech; + + // There is a complex problem with most asynchronous event notification mechanisms + // when used in a multi-threaded environment. Generally, a file descriptor or other + // event type that we are watching will be associated with some data used to manage + // that event source. For example a web server needs to maintain information about + // each client connection, such as the state of the connection (what protocol version + // has been negotiated, etc; if a transfer is taking place, what file is being + // transferred etc). + // + // However, sometimes we want to remove an event source (eg webserver wants to drop + // a connection) and delete the associated data. The problem here is that it is + // difficult to be sure when it is ok to actually remove the data, since when + // requesting to unwatch the source in one thread it is still possible that an + // event from that source is just being reported to another thread (in which case + // the data will be needed). + // + // To solve that, we: + // - allow only one thread to poll for events at a time, using a lock + // - use the same lock to prevent polling, if we want to unwatch an event source + // - generate an event to interrupt any polling that may already be occurring in + // another thread + // - mark handlers as active if they are currently executing, and + // - when removing an active handler, simply set a flag which causes it to be + // removed once the current processing is finished, rather than removing it + // immediately. + // + // In particular the lock mechanism for preventing multiple threads polling and + // for allowing polling to be interrupted is tricky. We can't use a simple mutex + // since there is significant chance that it will be highly contended and there + // are no guarantees that its acquisition will be fair. In particular, we don't + // want a thread that is trying to unwatch a source being starved while another + // thread polls the event source. + // + // So, we use two wait queues protected by a single mutex. The "attn_waitqueue" + // (attention queue) is the high-priority queue, used for threads wanting to + // unwatch event sources. The "wait_waitquueue" is the queue used by threads + // that wish to actually poll for events. + // - The head of the "attn_waitqueue" is always the holder of the lock + // - Therefore, a poll-waiter must be moved from the wait_waitqueue to the + // attn_waitqueue to actually gain the lock. This is only done if the + // attn_waitqueue is otherwise empty. + // - The mutex only protects manipulation of the wait queues, and so should not + // be highly contended. + + T_Mutex wait_lock; // wait lock, used to prevent multiple threads from waiting + // on the event queue simultaneously. + waitqueue attn_waitqueue; + waitqueue wait_waitqueue; + + + void registerSignal(BaseSignalWatcher *callBack, int signo) + { + loop_mech.addSignalWatch(signo, callBack); + } + + void deregister(BaseSignalWatcher *callBack, int signo) noexcept + { + loop_mech.removeSignalWatch(signo); + + waitqueue_node qnode; + getAttnLock(qnode); + + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.issueDelete(callBack); + + releaseLock(qnode); + } + + void registerFd(BaseFdWatcher *callback, int fd, int eventmask) + { + loop_mech.addFdWatch(fd, callback, eventmask); + } + + void deregister(BaseFdWatcher *callback, int fd) + { + loop_mech.removeFdWatch(fd); + + waitqueue_node qnode; + getAttnLock(qnode); + + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.issueDelete(callback); + + releaseLock(qnode); + } + + void reserveChildWatch(BaseChildWatcher *callBack) + { + loop_mech.addSignalWatch(SIGCHLD, nullptr); // TODO remove this kludge + loop_mech.reserveChildWatch(); + } + + void registerChild(BaseChildWatcher *callBack, pid_t child) + { + loop_mech.addSignalWatch(SIGCHLD, nullptr); // TODO remove this kludge + loop_mech.addChildWatch(child, callBack); + } + + void registerReservedChild(BaseChildWatcher *callBack, pid_t child) noexcept + { + loop_mech.addReservedChildWatch(child, callBack); + } + + // Acquire the attention lock (when held, ensures that no thread is polling the AEN + // mechanism). + void getAttnLock(waitqueue_node &qnode) + { + std::unique_lock ulock(wait_lock); + attn_waitqueue.queue(&qnode); + if (attn_waitqueue.getHead() != &qnode) { + loop_mech.interruptWait(); + while (attn_waitqueue.getHead() != &qnode) { + qnode.wait(ulock); + } + } + } + + // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower + // priority than the attention lock). + void getPollwaitLock(waitqueue_node &qnode) + { + std::unique_lock ulock(wait_lock); + if (attn_waitqueue.getHead() == nullptr) { + // Queue is completely empty: + attn_waitqueue.queue(&qnode); + } + else { + wait_waitqueue.queue(&qnode); + } + + while (attn_waitqueue.getHead() != &qnode) { + qnode.wait(ulock); + } + } + + // Release the poll-wait/attention lock. + void releaseLock(waitqueue_node &qnode) + { + std::unique_lock ulock(wait_lock); + waitqueue_node * nhead = attn_waitqueue.unqueue(); + if (nhead != nullptr) { + nhead->signal(); + } + else { + nhead = wait_waitqueue.getHead(); + if (nhead != nullptr) { + attn_waitqueue.queue(nhead); + nhead->signal(); + } + } + } + + void processSignalRearm(BaseSignalWatcher * bsw, Rearm rearmType) + { + // Called with lock held + if (rearmType == Rearm::REARM) { + loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo()); + } + else if (rearmType == Rearm::REMOVE) { + loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo()); + } + } + + Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch) + { + // Called with lock held + if (is_multi_watch) { + BaseBidiFdWatcher * bdfw = static_cast(bfw); + + if (rearmType == Rearm::REMOVE) { + bdfw->read_removed = 1; + bdfw->watch_flags &= ~in_events; + if (! bdfw->write_removed) { + return Rearm::NOOP; + } + else { + // both removed: actually remove + loop_mech.removeFdWatch_nolock(bdfw->watch_fd); + } + } + else if (rearmType == Rearm::DISARM) { + // Nothing more to do + } + else if (rearmType == Rearm::REARM) { + bdfw->watch_flags |= in_events; + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + (bdfw->watch_flags & (in_events | out_events)) | one_shot); + } + return rearmType; + } + else { + if (rearmType == Rearm::REARM) { + loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw, + (bfw->watch_flags & (in_events | out_events)) | one_shot); + } + else if (rearmType == Rearm::REMOVE) { + loop_mech.removeFdWatch_nolock(bfw->watch_fd); + } + return rearmType; + } + } + + Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType) + { + // Called with lock held + if (rearmType == Rearm::REMOVE) { + bdfw->write_removed = 1; + bdfw->watch_flags &= ~out_events; + if (! bdfw->read_removed) { + return Rearm::NOOP; + } + else { + // both removed: actually remove + loop_mech.removeFdWatch_nolock(bdfw->watch_fd); + } + } + else if (rearmType == Rearm::DISARM) { + // Nothing more to do + } + else if (rearmType == Rearm::REARM) { + bdfw->watch_flags |= out_events; + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + (bdfw->watch_flags & (in_events | out_events)) | one_shot); + } + return rearmType; + } + + bool processEvents() noexcept + { + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.lock.lock(); + + // So this pulls *all* currently pending events and processes them in the current thread. + // That's probably good for throughput, but maybe the behavior should be configurable. + + BaseWatcher * pqueue = ed.first; + ed.first = nullptr; + bool active = false; + + BaseWatcher * prev = nullptr; + for (BaseWatcher * q = pqueue; q != nullptr; q = q->next) { + if (q->deleteme) { + q->watchRemoved(); + if (prev) { + prev->next = q->next; + } + else { + pqueue = q->next; + } + } + else { + q->active = true; + active = true; + } + } + + ed.lock.unlock(); + + while (pqueue != nullptr) { + Rearm rearmType = Rearm::NOOP; + bool is_multi_watch = false; + BaseBidiFdWatcher *bbfw = nullptr; + // (Above variables are initialised only to silence compiler warnings). + + // Note that we select actions based on the type of the watch, as determined by the watchType + // member. In some ways this screams out for polmorphism; a virtual function could be overridden + // by each of the watcher types. I've instead used switch/case because I think it will perform + // slightly better without the overhead of a virtual function dispatch, but it's got to be a + // close call; I might be guilty of premature optimisation here. + + switch (pqueue->watchType) { + case WatchType::SIGNAL: { + BaseSignalWatcher *bsw = static_cast(pqueue); + rearmType = bsw->gotSignal(this, bsw->siginfo.get_signo(), bsw->siginfo); + break; + } + case WatchType::FD: { + BaseFdWatcher *bfw = static_cast(pqueue); + is_multi_watch = bfw->watch_flags & dprivate::multi_watch; + if (is_multi_watch) { + // The primary watcher for a multi-watch watcher is queued for + // read events. + BaseBidiFdWatcher *bdfw = static_cast(bfw); + rearmType = bdfw->readReady(this, bfw->watch_fd); + } + else { + rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags); + } + break; + } + case WatchType::CHILD: { + BaseChildWatcher *bcw = static_cast(pqueue); + bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status); + // Child watches automatically remove: + rearmType = Rearm::REMOVE; + break; + } + case WatchType::SECONDARYFD: { + // first construct a pointer to the main watcher: + char * rp = (char *)pqueue; + rp -= offsetof(BaseBidiFdWatcher, outWatcher); + bbfw = (BaseBidiFdWatcher *)rp; + rearmType = bbfw->writeReady(this, bbfw->watch_fd); + break; + } + default: ; + } + + ed.lock.lock(); + + pqueue->active = false; + if (pqueue->deleteme) { + // We don't want a watch that is marked "deleteme" to re-arm itself. + // NOOP flags that the state is managed externally, so we don't adjust that. + if (rearmType != Rearm::NOOP) { + rearmType = Rearm::REMOVE; + } + } + switch (pqueue->watchType) { + case WatchType::SIGNAL: + processSignalRearm(static_cast(pqueue), rearmType); + break; + case WatchType::FD: + rearmType = processFdRearm(static_cast(pqueue), rearmType, is_multi_watch); + break; + case WatchType::SECONDARYFD: + processSecondaryRearm(bbfw, rearmType); + break; + default: ; + } + + if (pqueue->deleteme) rearmType = Rearm::REMOVE; // makes the watchRemoved() callback get called. + + ed.lock.unlock(); + + if (rearmType == Rearm::REMOVE) { + pqueue->watchRemoved(); + } + + pqueue = pqueue->next; + } + + return active; + } + + + public: + void run() noexcept + { + while (! processEvents()) { + waitqueue_node qnode; + + // We only allow one thread to poll the mechanism at any time, since otherwise + // removing event watchers is a nightmare beyond comprehension. + getPollwaitLock(qnode); + + // Pull events from the AEN mechanism and insert them in our internal queue: + loop_mech.pullEvents(true); + + // Now release the wait lock: + releaseLock(qnode); + } + } +}; + + +typedef EventLoop NEventLoop; +typedef EventLoop TEventLoop; + +// from dasync.cc: +TEventLoop & getSystemLoop(); + +// Posix signal event watcher +template +class PosixSignalWatcher : private dprivate::BaseSignalWatcher +{ +public: + using SigInfo_p = typename dprivate::BaseSignalWatcher::SigInfo_p; + + // Register this watcher to watch the specified signal. + // If an attempt is made to register with more than one event loop at + // a time, behaviour is undefined. + inline void registerWatch(EventLoop *eloop, int signo) + { + this->deleteme = false; + this->siginfo.set_signo(signo); + eloop->registerSignal(this, signo); + } + + inline void deregisterWatch(EventLoop *eloop) noexcept + { + eloop->deregister(this, this->siginfo.get_signo()); + } + + // virtual Rearm gotSignal(EventLoop *, int signo, SigInfo_p info) = 0; +}; + +// Posix file descriptor event watcher +template +class PosixFdWatcher : private dprivate::BaseFdWatcher +{ + protected: + + // Set the types of event to watch. May not be supported for all mechanisms. + // Only safe to call from within the callback handler (gotEvent). + void setWatchFlags(int newFlags) + { + this->watch_flags = newFlags; + } + + public: + + void registerWith(EventLoop *eloop, int fd, int flags) + { + this->deleteme = false; + this->watch_fd = fd; + this->watch_flags = flags; + eloop->registerFd(this, fd, flags); + } + + void deregisterWatch(EventLoop *eloop) noexcept + { + eloop->deregister(this, this->watch_fd); + } + + // virtual Rearm gotEvent(EventLoop *, int fd, int flags) = 0; +}; + +// Posix child process event watcher +template +class PosixChildWatcher : private dprivate::BaseChildWatcher +{ + public: + void reserveWith(EventLoop *eloop) + { + eloop->reserveChildWatch(); + } + + void registerWith(EventLoop *eloop, pid_t child) + { + this->deleteme = false; + this->watch_pid = child; + eloop->registerChild(this, child); + } + + void registerReserved(EventLoop *eloop, pid_t child) noexcept + { + eloop->registerReservedChild(this, child); + } + + // virtual void gotTermStat(EventLoop *, pid_t child, int status) = 0; +}; + +} // namespace dasync + +#endif diff --git a/src/dasync/dmutex.h b/src/dasync/dmutex.h new file mode 100644 index 0000000..b219621 --- /dev/null +++ b/src/dasync/dmutex.h @@ -0,0 +1,61 @@ +#ifndef D_MUTEX_H_INCLUDED +#define D_MUTEX_H_INCLUDED + +//#include +#include + +namespace dasync { + +// Simple non-recursive mutex, with priority inheritance to avoid priority inversion. +/* +class DMutex +{ + private: + pthread_mutex_t mutex; + + public: + DMutex() + { + // Avoid priority inversion by using PTHREAD_PRIO_INHERIT + pthread_mutexattr_t attribs; + pthread_mutexattr_init(&attribs); + pthread_mutexattr_setprotocol(&attribs, PTHREAD_PRIO_INHERIT); + pthread_mutex_init(&mutex, &attribs); + } + + void lock() + { + pthread_mutex_lock(&mutex); + } + + void unlock() + { + pthread_mutex_unlock(&mutex); + } +}; +*/ + +using DMutex = std::mutex; + +// A "null" mutex, for which locking / unlocking actually does nothing. +class NullMutex +{ + #ifdef __GNUC__ + #ifndef __clang__ + char empty[0]; // Make class instances take up no space (gcc) + #else + char empty[0] __attribute__((unused)); // Make class instances take up no space (clang) + #endif + #endif + + public: + void lock() { } + void unlock() { } + void try_lock() { } +}; + + +} // end of namespace + + +#endif diff --git a/src/dinit-log.cc b/src/dinit-log.cc index 03c9683..64ea5af 100644 --- a/src/dinit-log.cc +++ b/src/dinit-log.cc @@ -1,14 +1,17 @@ #include #include -#include #include #include +#include "dasync.h" + #include "service.h" #include "dinit-log.h" #include "cpbuffer.h" +extern EventLoop_t eventLoop; + LogLevel log_level = LogLevel::WARN; LogLevel cons_log_level = LogLevel::WARN; static bool log_to_console = false; // whether we should output log messages to @@ -18,15 +21,19 @@ static bool log_current_line; // Whether the current line is being logged static ServiceSet *service_set = nullptr; // Reference to service set -static void log_conn_callback(struct ev_loop *loop, struct ev_io *w, int revents) noexcept; +namespace { + class BufferedLogStream; +} -class BufferedLogStream +// TODO just make this the callback, directly. +static Rearm log_conn_callback(EventLoop_t *loop, BufferedLogStream *w, int revents) noexcept; + +namespace { +class BufferedLogStream : public PosixFdWatcher { public: CPBuffer<4096> log_buffer; - struct ev_io eviocb; - // Outgoing: bool partway = false; // if we are partway throught output of a log message bool discarded = false; // if we have discarded a message @@ -40,13 +47,22 @@ class BufferedLogStream bool special = false; // currently outputting special message? char *special_buf; // buffer containing special message int msg_index; // index into special message + + int fd; void init(int fd) { - ev_io_init(&eviocb, log_conn_callback, fd, EV_WRITE); - eviocb.data = this; + //ev_io_init(&eviocb, log_conn_callback, fd, EV_WRITE); + //eviocb.data = this; + this->fd = fd; + } + + Rearm gotEvent(EventLoop_t *loop, int fd, int flags) noexcept override + { + return log_conn_callback(loop, this, flags); } }; +} // Two log streams: // (One for main log, one for console) @@ -58,7 +74,8 @@ constexpr static int DLOG_CONS = 1; // console static void release_console() { - ev_io_stop(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb); + //ev_io_stop(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb); + //log_stream[DLOG_CONS].deregisterWatch(&eventLoop); // now handled elsewhere if (! log_to_console) { int flags = fcntl(1, F_GETFL, 0); fcntl(1, F_SETFL, flags & ~O_NONBLOCK); @@ -66,9 +83,9 @@ static void release_console() } } -static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noexcept +static Rearm log_conn_callback(EventLoop_t * loop, BufferedLogStream * w, int revents) noexcept { - auto &log_stream = *static_cast(w->data); + auto &log_stream = *w; if (log_stream.special) { char * start = log_stream.special_buf + log_stream.msg_index; @@ -83,7 +100,7 @@ static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noe } else { log_stream.msg_index += r; - return; + return Rearm::REARM; } } else { @@ -91,7 +108,7 @@ static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noe // other error? // TODO } - return; + return Rearm::REARM; } else { // Writing from the regular circular buffer @@ -100,7 +117,7 @@ static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noe if (log_stream.current_index == 0) { release_console(); - return; + return Rearm::REMOVE; } char *ptr = log_stream.log_buffer.get_ptr(0); @@ -127,6 +144,7 @@ static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noe if (log_stream.current_index == 0 || !log_to_console) { // No more messages buffered / stop logging to console: release_console(); + return Rearm::REMOVE; } } } @@ -134,13 +152,13 @@ static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noe // TODO // EAGAIN / EWOULDBLOCK? // error? - return; + return Rearm::REARM; } } // We've written something by the time we get here. We could fall through to below, but // let's give other events a chance to be processed by returning now. - return; + return Rearm::REARM; } void init_log(ServiceSet *sset) noexcept @@ -163,7 +181,8 @@ void enable_console_log(bool enable) noexcept //ev_io_init(& log_stream[DLOG_CONS].eviocb, log_conn_callback, 1, EV_WRITE); log_stream[DLOG_CONS].init(STDOUT_FILENO); if (log_stream[DLOG_CONS].current_index > 0) { - ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb); + //ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb); + log_stream[DLOG_CONS].registerWith(&eventLoop, log_stream[DLOG_CONS].fd, out_events); } log_to_console = true; } @@ -173,10 +192,11 @@ void enable_console_log(bool enable) noexcept if (log_stream[DLOG_CONS].current_index > 0) { // Try to flush any messages that are currently buffered. (Console is non-blocking // so it will fail gracefully). - log_conn_callback(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb, EV_WRITE); + log_conn_callback(&eventLoop, &log_stream[DLOG_CONS], out_events); } else { release_console(); + log_stream[DLOG_CONS].deregisterWatch(&eventLoop); } } // (if we're partway through logging a message, we release the console when @@ -218,7 +238,8 @@ template static void do_log(T ... args) noexcept bool was_first = (log_stream[DLOG_CONS].current_index == 0); log_stream[DLOG_CONS].current_index += amount; if (was_first && log_to_console) { - ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb); + //ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb); + log_stream[DLOG_CONS].registerWith(&eventLoop, log_stream[DLOG_CONS].fd, out_events); } } else { diff --git a/src/dinit.cc b/src/dinit.cc index e540b84..a0e7a5f 100644 --- a/src/dinit.cc +++ b/src/dinit.cc @@ -13,8 +13,8 @@ #include #include +#include "dasync.h" #include "service.h" -#include "ev++.h" #include "control.h" #include "dinit-log.h" @@ -60,13 +60,43 @@ */ -static void sigint_reboot_cb(struct ev_loop *loop, ev_signal *w, int revents); -static void sigquit_cb(struct ev_loop *loop, ev_signal *w, int revents); -static void sigterm_cb(struct ev_loop *loop, ev_signal *w, int revents); -void open_control_socket(struct ev_loop *loop) noexcept; -void close_control_socket(struct ev_loop *loop) noexcept; +using namespace dasync; +using EventLoop_t = EventLoop; -struct ev_io control_socket_io; +EventLoop_t eventLoop = EventLoop_t(); + +// TODO remove: +//static void sigint_reboot_cb(struct ev_loop *loop, ev_signal *w, int revents); +//static void sigquit_cb(struct ev_loop *loop, ev_signal *w, int revents); +//static void sigterm_cb(struct ev_loop *loop, ev_signal *w, int revents); +static void sigint_reboot_cb(EventLoop_t *eloop) noexcept; +static void sigquit_cb(EventLoop_t *eloop) noexcept; +static void sigterm_cb(EventLoop_t *eloop) noexcept; +void open_control_socket(EventLoop_t *loop) noexcept; +void close_control_socket(EventLoop_t *loop) noexcept; + +static void control_socket_cb(EventLoop_t *loop, int fd); + +class ControlSocketWatcher : public PosixFdWatcher +{ + Rearm gotEvent(EventLoop_t * loop, int fd, int flags) + { + control_socket_cb(loop, fd); + return Rearm::REARM; + } + + public: + // TODO the fd is already stored, must we really store it again... + int fd; + + void registerWith(EventLoop_t * loop, int fd, int flags) + { + this->fd = fd; + PosixFdWatcher::registerWith(loop, fd, flags); + } +}; + +ControlSocketWatcher control_socket_io; // Variables @@ -103,6 +133,41 @@ const char * get_user_home() } +namespace { + class CallbackSignalHandler : public PosixSignalWatcher + { + public: + typedef void (*cb_func_t)(EventLoop_t *); + + private: + cb_func_t cb_func; + + public: + CallbackSignalHandler() : cb_func(nullptr) { } + CallbackSignalHandler(cb_func_t pcb_func) : cb_func(pcb_func) { } + + void setCbFunc(cb_func_t cb_func) + { + this->cb_func = cb_func; + } + + Rearm gotSignal(EventLoop_t * eloop, int signo, SigInfo_p siginfo) override + { + service_set->stop_all_services(ShutdownType::REBOOT); + return Rearm::REARM; + } + }; + + class ControlSocketWatcher : public PosixFdWatcher + { + Rearm gotEvent(EventLoop_t * loop, int fd, int flags) + { + control_socket_cb(loop, fd); + return Rearm::REARM; + } + }; +} + int main(int argc, char **argv) { using namespace std; @@ -193,11 +258,12 @@ int main(int argc, char **argv) /* Set up signal handlers etc */ /* SIG_CHILD is ignored by default: good */ - /* sigemptyset(&sigwait_set); */ - /* sigaddset(&sigwait_set, SIGCHLD); */ - /* sigaddset(&sigwait_set, SIGINT); */ - /* sigaddset(&sigwait_set, SIGTERM); */ - /* sigprocmask(SIG_BLOCK, &sigwait_set, NULL); */ + sigset_t sigwait_set; + sigemptyset(&sigwait_set); + sigaddset(&sigwait_set, SIGCHLD); + sigaddset(&sigwait_set, SIGINT); + sigaddset(&sigwait_set, SIGTERM); + sigprocmask(SIG_BLOCK, &sigwait_set, NULL); // Terminal access control signals - we block these so that dinit can't be // suspended if it writes to the terminal after some other process has claimed @@ -234,35 +300,45 @@ int main(int argc, char **argv) } // Set up signal handlers - ev_signal sigint_ev_signal; + //ev_signal sigint_ev_signal; + CallbackSignalHandler sigint_watcher; if (am_system_init) { - ev_signal_init(&sigint_ev_signal, sigint_reboot_cb, SIGINT); + //ev_signal_init(&sigint_ev_signal, sigint_reboot_cb, SIGINT); + sigint_watcher.setCbFunc(sigint_reboot_cb); } else { - ev_signal_init(&sigint_ev_signal, sigterm_cb, SIGINT); + //ev_signal_init(&sigint_ev_signal, sigterm_cb, SIGINT); + sigint_watcher.setCbFunc(sigterm_cb); } - ev_signal sigquit_ev_signal; + //ev_signal sigquit_ev_signal; + CallbackSignalHandler sigquit_watcher; if (am_system_init) { // PID 1: SIGQUIT exec's shutdown - ev_signal_init(&sigquit_ev_signal, sigquit_cb, SIGQUIT); + //ev_signal_init(&sigquit_ev_signal, sigquit_cb, SIGQUIT); + sigquit_watcher.setCbFunc(sigquit_cb); } else { // Otherwise: SIGQUIT terminates dinit - ev_signal_init(&sigquit_ev_signal, sigterm_cb, SIGQUIT); + //ev_signal_init(&sigquit_ev_signal, sigterm_cb, SIGQUIT); + sigquit_watcher.setCbFunc(sigterm_cb); } - ev_signal sigterm_ev_signal; - ev_signal_init(&sigterm_ev_signal, sigterm_cb, SIGTERM); + //ev_signal sigterm_ev_signal; + //ev_signal_init(&sigterm_ev_signal, sigterm_cb, SIGTERM); + auto sigterm_watcher = CallbackSignalHandler(sigterm_cb); /* Set up libev */ - struct ev_loop *loop = ev_default_loop(EVFLAG_AUTO /* | EVFLAG_SIGNALFD */); - ev_signal_start(loop, &sigint_ev_signal); - ev_signal_start(loop, &sigquit_ev_signal); - ev_signal_start(loop, &sigterm_ev_signal); + //struct ev_loop *loop = ev_default_loop(EVFLAG_AUTO /* | EVFLAG_SIGNALFD */); + //ev_signal_start(loop, &sigint_ev_signal); + //ev_signal_start(loop, &sigquit_ev_signal); + //ev_signal_start(loop, &sigterm_ev_signal); + sigint_watcher.registerWatch(&eventLoop, SIGINT); + sigquit_watcher.registerWatch(&eventLoop, SIGQUIT); + sigterm_watcher.registerWatch(&eventLoop, SIGTERM); // Try to open control socket (may fail due to readonly filesystem) - open_control_socket(loop); + open_control_socket(&eventLoop); #ifdef __linux__ if (am_system_init) { @@ -299,7 +375,8 @@ int main(int argc, char **argv) // Process events until all services have terminated. while (service_set->count_active_services() != 0) { - ev_loop(loop, EVLOOP_ONESHOT); + // ev_loop(loop, EVLOOP_ONESHOT); + eventLoop.run(); } ShutdownType shutdown_type = service_set->getShutdownType(); @@ -321,7 +398,7 @@ int main(int argc, char **argv) } } - close_control_socket(ev_default_loop(EVFLAG_AUTO)); + close_control_socket(&eventLoop); if (am_system_init) { if (shutdown_type == ShutdownType::CONTINUE) { @@ -357,7 +434,8 @@ int main(int argc, char **argv) // PID 1 must not actually exit, although we should never reach this point: while (true) { - ev_loop(loop, EVLOOP_ONESHOT); + // ev_loop(loop, EVLOOP_ONESHOT); + eventLoop.run(); } } @@ -365,15 +443,13 @@ int main(int argc, char **argv) } // Callback for control socket -static void control_socket_cb(struct ev_loop *loop, ev_io *w, int revents) +static void control_socket_cb(EventLoop_t *loop, int sockfd) { // TODO limit the number of active connections. Keep a tally, and disable the // control connection listening socket watcher if it gets high, and re-enable // it once it falls below the maximum. // Accept a connection - int sockfd = w->fd; - int newfd = accept4(sockfd, nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC); if (newfd != -1) { @@ -387,7 +463,7 @@ static void control_socket_cb(struct ev_loop *loop, ev_io *w, int revents) } } -void open_control_socket(struct ev_loop *loop) noexcept +void open_control_socket(EventLoop_t *loop) noexcept { if (! control_socket_open) { const char * saddrname = control_socket_path; @@ -441,16 +517,18 @@ void open_control_socket(struct ev_loop *loop) noexcept } control_socket_open = true; - ev_io_init(&control_socket_io, control_socket_cb, sockfd, EV_READ); - ev_io_start(loop, &control_socket_io); + //ev_io_init(&control_socket_io, control_socket_cb, sockfd, EV_READ); + //ev_io_start(loop, &control_socket_io); + control_socket_io.registerWith(&eventLoop, sockfd, in_events); } } -void close_control_socket(struct ev_loop *loop) noexcept +void close_control_socket(EventLoop_t *loop) noexcept { if (control_socket_open) { int fd = control_socket_io.fd; - ev_io_stop(loop, &control_socket_io); + //ev_io_stop(loop, &control_socket_io); + control_socket_io.deregisterWatch(&eventLoop); close(fd); // Unlink the socket: @@ -459,24 +537,24 @@ void close_control_socket(struct ev_loop *loop) noexcept } /* handle SIGINT signal (generated by kernel when ctrl+alt+del pressed) */ -static void sigint_reboot_cb(struct ev_loop *loop, ev_signal *w, int revents) +static void sigint_reboot_cb(EventLoop_t *eloop) noexcept { service_set->stop_all_services(ShutdownType::REBOOT); } /* handle SIGQUIT (if we are system init) */ -static void sigquit_cb(struct ev_loop *loop, ev_signal *w, int revents) +static void sigquit_cb(EventLoop_t *eloop) noexcept { // This allows remounting the filesystem read-only if the dinit binary has been // unlinked. In that case the kernel holds the binary open, so that it can't be // properly removed. - close_control_socket(ev_default_loop(EVFLAG_AUTO)); + close_control_socket(eloop); execl("/sbin/shutdown", "/sbin/shutdown", (char *) 0); log(LogLevel::ERROR, "Error executing /sbin/shutdown: ", strerror(errno)); } /* handle SIGTERM/SIGQUIT - stop all services (not used for system daemon) */ -static void sigterm_cb(struct ev_loop *loop, ev_signal *w, int revents) +static void sigterm_cb(EventLoop_t *eloop) noexcept { service_set->stop_all_services(); } diff --git a/src/service.cc b/src/service.cc index 41fdbca..98de0e5 100644 --- a/src/service.cc +++ b/src/service.cc @@ -18,8 +18,8 @@ #include "dinit-log.h" // from dinit.cc: -void open_control_socket(struct ev_loop *loop) noexcept; - +void open_control_socket(EventLoop_t *loop) noexcept; +extern EventLoop_t eventLoop; // Find the requested service by name static ServiceRecord * findService(const std::list & records, @@ -91,13 +91,12 @@ void ServiceRecord::stopped() noexcept } } -void ServiceRecord::process_child_callback(struct ev_loop *loop, ev_child *w, int revents) noexcept +void ServiceChildWatcher::gotTermStat(EventLoop_t * loop, pid_t child, int status) noexcept { - ServiceRecord *sr = (ServiceRecord *) w->data; - + ServiceRecord *sr = service; + sr->pid = -1; - sr->exit_status = w->rstatus; - ev_child_stop(loop, w); + sr->exit_status = status; // Ok, for a process service, any process death which we didn't rig // ourselves is a bit... unexpected. Probably, the child died because @@ -110,7 +109,7 @@ void ServiceRecord::process_child_callback(struct ev_loop *loop, ev_child *w, in return; } - sr->handle_exit_status(); + sr->handle_exit_status(); } bool ServiceRecord::do_auto_restart() noexcept @@ -206,15 +205,22 @@ void ServiceRecord::handle_exit_status() noexcept } } -void ServiceRecord::process_child_status(struct ev_loop *loop, ev_io * stat_io, int revents) noexcept +Rearm ServiceIoWatcher::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept { - ServiceRecord *sr = (ServiceRecord *) stat_io->data; + ServiceRecord::process_child_status(loop, this, flags); + return Rearm::NOOP; +} + +// TODO remove unused revents param +void ServiceRecord::process_child_status(EventLoop_t *loop, ServiceIoWatcher * stat_io, int revents) noexcept +{ + ServiceRecord *sr = stat_io->service; sr->waiting_for_execstat = false; int exec_status; int r = read(stat_io->fd, &exec_status, sizeof(int)); close(stat_io->fd); - ev_io_stop(loop, stat_io); + stat_io->deregisterWatch(loop); if (r != 0) { // We read an errno code; exec() failed, and the service startup failed. @@ -224,7 +230,7 @@ void ServiceRecord::process_child_status(struct ev_loop *loop, ev_io * stat_io, sr->failed_to_start(); } else if (sr->service_state == ServiceState::STOPPING) { - // Must be a scripted servce. We've logged the failure, but it's probably better + // Must be a scripted service. We've logged the failure, but it's probably better // not to leave the service in STARTED state: sr->stopped(); } @@ -506,10 +512,11 @@ bool ServiceRecord::read_pid_file() noexcept if (r > 0) { pidbuf[r] = 0; // store nul terminator pid = std::atoi(pidbuf); - if (kill(pid, 0) == 0) { - ev_child_init(&child_listener, process_child_callback, pid, 0); - child_listener.data = this; - ev_child_start(ev_default_loop(EVFLAG_AUTO), &child_listener); + if (kill(pid, 0) == 0) { + //ev_child_init(&child_listener, process_child_callback, pid, 0); + //child_listener.data = this; + //ev_child_start(ev_default_loop(EVFLAG_AUTO), &child_listener); + child_listener.registerWith(&eventLoop, pid); } else { log(LogLevel::ERROR, service_name, ": pid read from pidfile (", pid, ") is not valid"); @@ -546,7 +553,7 @@ void ServiceRecord::started() noexcept notifyListeners(ServiceEvent::STARTED); if (onstart_flags.rw_ready) { - open_control_socket(ev_default_loop(EVFLAG_AUTO)); + open_control_socket(&eventLoop); } if (force_stop || desired_state == ServiceState::STOPPED) { @@ -634,7 +641,16 @@ bool ServiceRecord::start_ps_process(const std::vector &cmd, bool if (forkpid == 0) { // Child process. Must not allocate memory (or otherwise risk throwing any exception) // from here until exit(). - ev_default_destroy(); // won't need that on this side, free up fds. + // TODO: we may need an equivalent for the following: + // ev_default_destroy(); // won't need that on this side, free up fds. + + // Unmask signals that we masked on startup: + sigset_t sigwait_set; + sigemptyset(&sigwait_set); + sigaddset(&sigwait_set, SIGCHLD); + sigaddset(&sigwait_set, SIGINT); + sigaddset(&sigwait_set, SIGTERM); + sigprocmask(SIG_UNBLOCK, &sigwait_set, NULL); constexpr int bufsz = ((CHAR_BIT * sizeof(pid_t) - 1) / 3 + 2) + 11; // "LISTEN_PID=" - 11 characters @@ -693,15 +709,13 @@ bool ServiceRecord::start_ps_process(const std::vector &cmd, bool pid = forkpid; // Listen for status - ev_io_init(&child_status_listener, process_child_status, pipefd[0], EV_READ); - child_status_listener.data = this; - ev_io_start(ev_default_loop(EVFLAG_AUTO), &child_status_listener); + // TODO should set this up earlier so we can handle failure case (exception) + child_status_listener.registerWith(&eventLoop, pipefd[0], in_events); // Add a process listener so we can detect when the // service stops - ev_child_init(&child_listener, process_child_callback, pid, 0); - child_listener.data = this; - ev_child_start(ev_default_loop(EVFLAG_AUTO), &child_listener); + // TODO should reserve listener, handle exceptions + child_listener.registerWith(&eventLoop, pid); waiting_for_execstat = true; return true; } diff --git a/src/service.h b/src/service.h index 35a28a3..66a3014 100644 --- a/src/service.h +++ b/src/service.h @@ -6,7 +6,9 @@ #include #include #include -#include "ev.h" + +#include "dasync.h" + #include "control.h" #include "service-listener.h" #include "service-constants.h" @@ -146,9 +148,38 @@ static std::vector separate_args(std::string &s, std::list +{ + public: + // TODO resolve clunkiness of storing this field + ServiceRecord * service; + void gotTermStat(EventLoop_t * eloop, pid_t child, int status) noexcept; + + ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { } +}; + +class ServiceIoWatcher : public PosixFdWatcher +{ + public: + // TODO resolve clunkiness of storing these fields + int fd; + ServiceRecord * service; + Rearm gotEvent(EventLoop_t * eloop, int fd, int flags) noexcept; + + ServiceIoWatcher(ServiceRecord * sr) noexcept : service(sr) { } + + void registerWith(EventLoop_t *loop, int fd, int flags) + { + this->fd = fd; + PosixFdWatcher::registerWith(loop, fd, flags); + } +}; class ServiceRecord { + friend class ServiceChildWatcher; + friend class ServiceIoWatcher; + typedef std::string string; string service_name; @@ -222,8 +253,8 @@ class ServiceRecord int socket_fd = -1; // For socket-activation services, this is the file // descriptor for the socket. - ev_child child_listener; - ev_io child_status_listener; + ServiceChildWatcher child_listener; + ServiceIoWatcher child_status_listener; // All dependents have stopped. void allDepsStopped(); @@ -244,10 +275,10 @@ class ServiceRecord bool start_ps_process(const std::vector &args, bool on_console) noexcept; // Callback from libev when a child process dies - static void process_child_callback(struct ev_loop *loop, struct ev_child *w, + static void process_child_callback(EventLoop_t *loop, ServiceChildWatcher *w, int revents) noexcept; - static void process_child_status(struct ev_loop *loop, ev_io * stat_io, + static void process_child_status(EventLoop_t *loop, ServiceIoWatcher * stat_io, int revents) noexcept; void handle_exit_status() noexcept; @@ -327,7 +358,7 @@ class ServiceRecord : service_state(ServiceState::STOPPED), desired_state(ServiceState::STOPPED), auto_restart(false), pinned_stopped(false), pinned_started(false), waiting_for_deps(false), waiting_for_execstat(false), doing_recovery(false), - start_explicit(false), force_stop(false) + start_explicit(false), force_stop(false), child_listener(this), child_status_listener(this) { service_set = set; service_name = name; -- 2.25.1