+* CPBuffer: cleanup methods a bit (fix API)
* When a PROCESS service process dies, and smooth_recovery is false, probably
need to force-stop dependents even if the process itself was stopped
deliberately.
- support for listing all services
* Implement a control utility to start/stop services after dinit has started
- very basic version exists, needs thorough cleanup
-* Clean up tree a little, move source files into "src"
+* We've replaced libev, so that we don't abort on failure. But now exceptions
+ can be thrown when we register an event watch - need to handle these.
For version 1.0:
----------------
sends terminated process IDs over a pipe to Dinit. Finally, it may be possible
to run dinit (and subprocesses) in a new PID namespace (again linux-only).
* Allow logging tasks to memory (growing or circular buffer) and later
- switching to disk logging (allows for filesystem mounted readonly on boot)
+ switching to disk logging (allows for filesystem mounted readonly on boot).
+ But perhaps this really the responsibility of another daemon.
* Rate control on process respawn
* Allow running services with different resource limits, chroot, cgroups,
namespaces (pid/fs/uid), etc
factor.
* Cron-like tasks (if started, they run a sub-task periodically. Stopping the
task will wait until the sub-task is complete).
-* Socket activation of services? Not sure if enough non-SystemD derived
- daemons actually support this to warrant implementing it.
* Allow to run services attached to virtual tty, allow connection to that tty (ala "screen").
* SystemD-like handling of filesystem mounts (see autofs documentation in kernel)
i.e. a mount point gets an autofs attached, and lazily gets mounted when accessed
$(CXX) -o dinit-reboot dinit-reboot.o
$(objects): %.o: %.cc service.h dinit-log.h control.h control-cmds.h cpbuffer.h
- $(CXX) $(CXXOPTS) -c $< -o $@
+ $(CXX) $(CXXOPTS) -Idasync -c $< -o $@
#install: all
char outbuf[] = { DINIT_RP_BADREQ };
if (! queuePacket(outbuf, 1)) return;
bad_conn_close = true;
- ev_io_set(&iob, iob.fd, EV_WRITE);
+ iob.setWatchFlags(out_events);
}
return;
}
char badreqRep[] = { DINIT_RP_BADREQ };
if (! queuePacket(badreqRep, 1)) return;
bad_conn_close = true;
- ev_io_set(&iob, iob.fd, EV_WRITE);
+ iob.setWatchFlags(out_events);
return;
}
char badreqRep[] = { DINIT_RP_BADREQ };
if (! queuePacket(badreqRep, 1)) return;
bad_conn_close = true;
- ev_io_set(&iob, iob.fd, EV_WRITE);
+ iob.setWatchFlags(out_events);
return;
}
else {
char badreqRep[] = { DINIT_RP_BADREQ };
if (! queuePacket(badreqRep, 1)) return;
bad_conn_close = true;
- ev_io_set(&iob, iob.fd, EV_WRITE);
+ iob.setWatchFlags(out_events);
return;
}
else {
pkt += wr;
size -= wr;
}
- ev_io_set(&iob, iob.fd, EV_READ | EV_WRITE);
+ iob.setWatchFlags(in_events | out_events);
}
// Create a vector out of the (remaining part of the) packet:
delete this;
}
else {
- ev_io_set(&iob, iob.fd, EV_WRITE);
+ iob.setWatchFlags(out_events);
}
return false;
}
}
outpkt_index = wr;
}
- ev_io_set(&iob, iob.fd, EV_READ | EV_WRITE);
+ iob.setWatchFlags(in_events | out_events);
}
try {
delete this;
}
else {
- ev_io_set(&iob, iob.fd, EV_WRITE);
+ iob.setWatchFlags(out_events);
}
return false;
}
// TODO log error?
// TODO error response?
bad_conn_close = true;
- ev_io_set(&iob, iob.fd, EV_WRITE);
+ iob.setWatchFlags(out_events);
}
return false;
outpkt_index = 0;
if (outbuf.empty() && ! oom_close) {
if (! bad_conn_close) {
- ev_io_set(&iob, iob.fd, EV_READ);
+ iob.setWatchFlags(in_events);
}
else {
delete this;
ControlConn::~ControlConn() noexcept
{
close(iob.fd);
- ev_io_stop(loop, &iob);
+ iob.deregisterWatch(loop);
// Clear service listeners
for (auto p : serviceKeyMap) {
#include <vector>
#include <unordered_map>
#include <limits>
+#include <cstddef>
#include <unistd.h>
-#include <ev++.h>
+
+#include "dasync.h"
#include "dinit-log.h"
#include "control-cmds.h"
// Control connection for dinit
+using namespace dasync;
+using EventLoop_t = EventLoop<NullMutex>;
+
// TODO: Use the input buffer as a circular buffer, instead of chomping data from
// the front using a data move.
-// forward-declaration of callback:
-static void control_conn_cb(struct ev_loop * loop, ev_io * w, int revents);
-
class ControlConn;
+class ControlConnWatcher;
+
+// forward-declaration of callback:
+static void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
// Pointer to the control connection that is listening for rollback completion
extern ControlConn * rollback_handler_conn;
class ServiceSet;
class ServiceRecord;
+class ControlConnWatcher : public PosixFdWatcher<NullMutex>
+{
+ Rearm gotEvent(EventLoop_t * loop, int fd, int flags) override
+ {
+ control_conn_cb(loop, this, flags);
+ return Rearm::REARM;
+ }
+
+ public:
+ int fd; // TODO this is already stored, find a better way to access it.
+
+ using PosixFdWatcher<NullMutex>::setWatchFlags;
+
+ void registerWith(EventLoop_t *loop, int fd, int flags)
+ {
+ this->fd = fd;
+ PosixFdWatcher<NullMutex>::registerWith(loop, fd, flags);
+ }
+};
+
class ControlConn : private ServiceListener
{
- friend void control_conn_cb(struct ev_loop *, ev_io *, int);
+ friend void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
- struct ev_io iob;
- struct ev_loop *loop;
+ ControlConnWatcher iob;
+ EventLoop_t *loop;
ServiceSet *service_set;
bool bad_conn_close = false; // close when finished output?
{
bad_conn_close = true;
oom_close = true;
- ev_io_set(&iob, iob.fd, EV_WRITE);
+ iob.setWatchFlags(out_events);
+ //ev_io_set(&iob, iob.fd, EV_WRITE);
}
// Process service event broadcast.
}
public:
- ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
+ ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
{
- ev_io_init(&iob, control_conn_cb, fd, EV_READ);
- iob.data = this;
- ev_io_start(loop, &iob);
+ //ev_io_init(&iob, control_conn_cb, fd, EV_READ);
+ //iob.data = this;
+ //ev_io_start(loop, &iob);
+ iob.registerWith(loop, fd, in_events);
active_control_conns++;
}
};
-static void control_conn_cb(struct ev_loop * loop, ev_io * w, int revents)
+static void control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
{
- ControlConn *conn = (ControlConn *) w->data;
- if (revents & EV_READ) {
+ char * cc_addr = (reinterpret_cast<char *>(watcher)) - offsetof(ControlConn, iob);
+ ControlConn *conn = reinterpret_cast<ControlConn *>(cc_addr);
+ if (revents & in_events) {
if (conn->dataReady()) {
// ControlConn was deleted
return;
}
}
- if (revents & EV_WRITE) {
+ if (revents & out_events) {
conn->sendData();
}
}
--- /dev/null
+#include <system_error>
+#include <mutex>
+#include <type_traits>
+#include <unordered_map>
+#include <vector>
+
+#include <sys/epoll.h>
+#include <sys/signalfd.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <unistd.h>
+#include <signal.h>
+
+namespace dasync {
+
+// Event type bits
+constexpr unsigned int in_events = 1;
+constexpr unsigned int out_events = 2;
+constexpr unsigned int err_events = 4;
+
+constexpr unsigned int one_shot = 8;
+
+
+template <class Base> class EpollLoop;
+
+class EpollTraits
+{
+ template <class Base> friend class EpollLoop;
+
+ public:
+
+ class SigInfo
+ {
+ template <class Base> friend class EpollLoop;
+
+ struct signalfd_siginfo info;
+
+ public:
+ int get_signo() { return info.ssi_signo; }
+ int get_sicode() { return info.ssi_code; }
+ int get_siint() { return info.ssi_int; }
+ int get_ssiptr() { return info.ssi_ptr; }
+ int get_ssiaddr() { return info.ssi_addr; }
+
+ void set_signo(int signo) { info.ssi_signo = signo; }
+ };
+
+ class FD_r;
+
+ // File descriptor optional storage. If the mechanism can return the file descriptor, this
+ // class will be empty, otherwise it can hold a file descriptor.
+ class FD_s {
+ friend class FD_r;
+
+ // Epoll doesn't return the file descriptor (it can, but it can't return both file
+ // descriptor and user data).
+ int fd;
+ };
+
+ // File descriptor reference (passed to event callback). If the mechanism can return the
+ // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
+ // must be stored in an FD_s instance.
+ class FD_r {
+ public:
+ int getFd(FD_s ss)
+ {
+ return ss.fd;
+ }
+ };
+};
+
+
+template <class Base> class EpollLoop : public Base
+{
+ int epfd; // epoll fd
+ int sigfd; // signalfd fd; -1 if not initialised
+ sigset_t sigmask;
+
+ std::unordered_map<int, void *> sigdataMap;
+
+ // Base contains:
+ // lock - a lock that can be used to protect internal structure.
+ // receive*() methods will be called with lock held.
+ // receiveSignal(SigInfo &, user *) noexcept
+ // receiveFdEvent(FD_r, user *, int flags) noexcept
+
+ using SigInfo = EpollTraits::SigInfo;
+ using FD_r = typename EpollTraits::FD_r;
+
+ void processEvents(epoll_event *events, int r)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+ for (int i = 0; i < r; i++) {
+ void * ptr = events[i].data.ptr;
+
+ if (ptr == &sigfd) {
+ // Signal
+ SigInfo siginfo;
+ while (true) {
+ int r = read(sigfd, &siginfo.info, sizeof(siginfo.info));
+ if (r == -1) break;
+ if (siginfo.get_signo() != SIGCHLD) {
+ // TODO remove the special exception for SIGCHLD?
+ sigdelset(&sigmask, siginfo.get_signo());
+ }
+ auto iter = sigdataMap.find(siginfo.get_signo());
+ if (iter != sigdataMap.end()) {
+ void *userdata = (*iter).second;
+ Base::receiveSignal(siginfo, userdata);
+ }
+ }
+ signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
+ }
+ else {
+ int flags = 0;
+ (events[i].events & EPOLLIN) && (flags |= in_events);
+ (events[i].events & EPOLLHUP) && (flags |= in_events);
+ (events[i].events & EPOLLOUT) && (flags |= out_events);
+ (events[i].events & EPOLLERR) && (flags |= err_events);
+ Base::receiveFdEvent(*this, FD_r(), ptr, flags);
+ }
+ }
+ }
+
+ public:
+
+ /**
+ * EpollLoop constructor.
+ *
+ * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
+ */
+ EpollLoop() : sigfd(-1)
+ {
+ epfd = epoll_create1(EPOLL_CLOEXEC);
+ if (epfd == -1) {
+ throw std::system_error(errno, std::system_category());
+ }
+ sigemptyset(&sigmask);
+ }
+
+ ~EpollLoop()
+ {
+ close(epfd);
+ if (sigfd != -1) {
+ close(sigfd);
+ }
+ }
+
+ // flags: in_events | out_events
+ void addFdWatch(int fd, void *userdata, int flags)
+ {
+ struct epoll_event epevent;
+ // epevent.data.fd = fd;
+ epevent.data.ptr = userdata;
+ epevent.events = 0;
+
+ if (flags & one_shot) {
+ epevent.events = EPOLLONESHOT;
+ }
+ if (flags & in_events) {
+ epevent.events |= EPOLLIN;
+ }
+ if (flags & out_events) {
+ epevent.events |= EPOLLOUT;
+ }
+
+ if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+ }
+
+ void removeFdWatch(int fd)
+ {
+ epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr);
+ }
+
+ void removeFdWatch_nolock(int fd)
+ {
+ removeFdWatch(fd);
+ }
+
+ // Note this will *replace* the old flags with the new, that is,
+ // it can enable *or disable* read/write events.
+ void enableFdWatch(int fd, void *userdata, int flags)
+ {
+ struct epoll_event epevent;
+ // epevent.data.fd = fd;
+ epevent.data.ptr = userdata;
+ epevent.events = 0;
+
+ if (flags & one_shot) {
+ epevent.events = EPOLLONESHOT;
+ }
+ if (flags & in_events) {
+ epevent.events |= EPOLLIN;
+ }
+ if (flags & out_events) {
+ epevent.events |= EPOLLOUT;
+ }
+
+ if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+ }
+
+ void enableFdWatch_nolock(int fd, void *userdata, int flags)
+ {
+ enableFdWatch(fd, userdata, flags);
+ }
+
+ void disableFdWatch(int fd)
+ {
+ struct epoll_event epevent;
+ // epevent.data.fd = fd;
+ epevent.data.ptr = nullptr;
+ epevent.events = 0;
+
+ // Epoll documentation says that hangup will still be reported, need to check
+ // whether this is really the case. Suspect it is really only the case if
+ // EPOLLIN is set.
+ if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+ }
+
+ // Note signal should be masked before call.
+ void addSignalWatch(int signo, void *userdata)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+ sigdataMap[signo] = userdata;
+
+ // Modify the signal fd to watch the new signal
+ bool was_no_sigfd = (sigfd == -1);
+ sigaddset(&sigmask, signo);
+ sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
+ if (sigfd == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+
+ if (was_no_sigfd) {
+ // Add the signalfd to the epoll set.
+ struct epoll_event epevent;
+ epevent.data.ptr = &sigfd;
+ epevent.events = EPOLLIN;
+ // No need for EPOLLONESHOT - we can pull the signals out
+ // as we see them.
+ if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) {
+ close(sigfd);
+ throw new std::system_error(errno, std::system_category());
+ }
+ }
+ }
+
+ // Note, called with lock held:
+ void rearmSignalWatch_nolock(int signo) noexcept
+ {
+ sigaddset(&sigmask, signo);
+ signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
+ }
+
+ void removeSignalWatch_nolock(int signo) noexcept
+ {
+ sigdelset(&sigmask, signo);
+ signalfd(sigfd, &sigmask, 0);
+ }
+
+ void removeSignalWatch(int signo) noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ removeSignalWatch_nolock(signo);
+ }
+
+ // If events are pending, process an unspecified number of them.
+ // If no events are pending, wait until one event is received and
+ // process this event (and possibly any other events received
+ // simultaneously).
+ // If processing an event removes a watch, there is a possibility
+ // that the watched event will still be reported (if it has
+ // occurred) before pullEvents() returns.
+ //
+ // do_wait - if false, returns immediately if no events are
+ // pending.
+ void pullEvents(bool do_wait)
+ {
+ epoll_event events[16];
+ int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0);
+ if (r == -1 || r == 0) {
+ // signal or no events
+ return;
+ }
+
+ processEvents(events, r);
+ }
+
+ // If events are pending, process one of them.
+ // If no events are pending, wait until one event is received and
+ // process this event.
+ //
+ // do_wait - if false, returns immediately if no events are
+ // pending.
+ void pullOneEvent(bool do_wait)
+ {
+ epoll_event events[1];
+ int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0);
+ if (r == -1 || r == 0) {
+ // signal or no events
+ return;
+ }
+
+ processEvents(events, r);
+ }
+
+ // Interrupt any current poll operation (pullEvents/pullOneEvent), causing
+ // it to to return immediately.
+ void interruptWait()
+ {
+ // TODO
+ }
+};
+
+// Map of pid_t to void *, with possibility of reserving entries so that mappings can
+// be later added with no danger of allocator exhaustion (bad_alloc).
+class pid_map
+{
+ using pair = std::pair<pid_t, void *>;
+ std::unordered_map<pid_t, void *> base_map;
+ std::vector<pair> backup_vector;
+
+ // Number of entries in backup_vector that are actually in use (as opposed
+ // to simply reserved):
+ int backup_size = 0;
+
+ public:
+ using entry = std::pair<bool, void *>;
+
+ entry get(pid_t key) noexcept
+ {
+ auto it = base_map.find(key);
+ if (it == base_map.end()) {
+ // Not in map; look in vector
+ for (int i = 0; i < backup_size; i++) {
+ if (backup_vector[i].first == key) {
+ return entry(true, backup_vector[i].second);
+ }
+ }
+
+ return entry(false, nullptr);
+ }
+
+ return entry(true, it->second);
+ }
+
+ entry erase(pid_t key) noexcept
+ {
+ auto iter = base_map.find(key);
+ if (iter != base_map.end()) {
+ entry r(true, iter->second);
+ base_map.erase(iter);
+ return r;
+ }
+ for (int i = 0; i < backup_size; i++) {
+ if (backup_vector[i].first == key) {
+ entry r(true, backup_vector[i].second);
+ backup_vector.erase(backup_vector.begin() + i);
+ return r;
+ }
+ }
+ return entry(false, nullptr);
+ }
+
+ // Throws bad_alloc on reservation failure
+ void reserve()
+ {
+ backup_vector.resize(backup_vector.size() + 1);
+ }
+
+ void add(pid_t key, void *val) // throws std::bad_alloc
+ {
+ base_map[key] = val;
+ }
+
+ void add_from_reserve(pid_t key, void *val) noexcept
+ {
+ try {
+ base_map[key] = val;
+ backup_vector.resize(backup_vector.size() - 1);
+ }
+ catch (std::bad_alloc &) {
+ // We couldn't add into the map, use the reserve:
+ backup_vector[backup_size++] = pair(key, val);
+ }
+ }
+};
+
+template <class Base> class ChildProcEvents : public Base
+{
+ private:
+ pid_map child_waiters;
+
+ using SigInfo = typename Base::SigInfo;
+
+ protected:
+ void receiveSignal(SigInfo &siginfo, void *userdata)
+ {
+ if (siginfo.get_signo() == SIGCHLD) {
+ int status;
+ pid_t child;
+ while ((child = waitpid(-1, &status, WNOHANG)) > 0) {
+ pid_map::entry ent = child_waiters.erase(child);
+ if (ent.first) {
+ Base::receiveChildStat(child, status, ent.second);
+ }
+ }
+ }
+ else {
+ Base::receiveSignal(siginfo, userdata);
+ }
+ }
+
+ public:
+ void reserveChildWatch()
+ {
+ child_waiters.reserve();
+ }
+
+ void addChildWatch(pid_t child, void *val)
+ {
+ child_waiters.add(child, val);
+ }
+
+ void addReservedChildWatch(pid_t child, void *val) noexcept
+ {
+ child_waiters.add_from_reserve(child, val);
+ }
+};
+
+} // end namespace
--- /dev/null
+#ifndef DASYNC_H_INCLUDED
+#define DASYNC_H_INCLUDED
+
+#include "dasync-aen.h"
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <cstddef>
+
+#include "dmutex.h"
+
+
+
+// TODO consider using atomic variables instead of explicit locking where appropriate
+
+// Allow optimisation of empty classes by including this in the body:
+// May be included as the last entry for a class which is only
+// _potentially_ empty.
+
+/*
+#ifdef __GNUC__
+#ifdef __clang__
+#define EMPTY_BODY private: char empty_fill[0];
+#else
+#define EMPTY_BODY private: char empty_fill[0];
+#endif
+#else
+#define EMPTY_BODY
+#endif
+*/
+
+namespace dasync {
+
+
+/**
+ * Values for rearm/disarm return from event handlers
+ */
+enum class Rearm
+{
+ /** Re-arm the event watcher so that it receives further events */
+ REARM,
+ /** Disarm the event watcher so that it receives no further events, until it is re-armed explicitly */
+ DISARM,
+ /** Remove the event watcher (and call "removed" callback) */
+ REMOVE,
+ /** Leave in current state */
+ NOOP
+// TODO: add a REMOVED option, which means, "I removed myself, DON'T TOUCH ME"
+};
+
+
+// Forward declarations:
+template <typename T_Mutex> class EventLoop;
+template <typename T_Mutex> class PosixFdWatcher;
+template <typename T_Mutex> class PosixSignalWatcher;
+template <typename T_Mutex> class PosixChildWatcher;
+
+// Information about a received signal.
+// This is essentially a wrapper for the POSIX siginfo_t; its existence allows for mechanisms that receive
+// equivalent signal information in a different format (eg signalfd on Linux).
+using SigInfo = EpollTraits::SigInfo;
+
+namespace dprivate {
+ // (non-public API)
+
+ enum class WatchType
+ {
+ SIGNAL,
+ FD,
+ CHILD,
+ SECONDARYFD
+ };
+
+ template <typename T_Mutex, typename Traits> class EventDispatch;
+
+ // For FD watchers:
+ // Use this watch flag to indicate that in and out events should be reported separately,
+ // that is, watcher should not be disabled until all watched event types are queued.
+ constexpr static int multi_watch = 4;
+
+ // Represents a queued event notification
+ class BaseWatcher
+ {
+ template <typename T_Mutex, typename Traits> friend class EventDispatch;
+ template <typename T_Mutex> friend class dasync::EventLoop;
+
+ protected:
+ WatchType watchType;
+ int active : 1;
+ int deleteme : 1;
+
+ BaseWatcher * next;
+
+ public:
+ BaseWatcher(WatchType wt) noexcept : watchType(wt), active(0), deleteme(0), next(nullptr) { }
+
+ virtual ~BaseWatcher() noexcept { }
+
+ // Called when the watcher has been removed.
+ // It is guaranteed by the caller that:
+ // - the dispatch method is not currently running
+ // - the dispatch method will not be called.
+ virtual void watchRemoved() noexcept
+ {
+ // TODO this "delete" behaviour could be dependent on a flag, perhaps?
+ // delete this;
+ }
+ };
+
+ // Base signal event - not part of public API
+ template <typename T_Mutex>
+ class BaseSignalWatcher : public BaseWatcher
+ {
+ template <typename M, typename Traits> friend class EventDispatch;
+ friend class dasync::EventLoop<T_Mutex>;
+
+ protected:
+ SigInfo siginfo;
+ BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { }
+
+ public:
+ typedef SigInfo &SigInfo_p;
+
+ virtual Rearm gotSignal(EventLoop<T_Mutex> * eloop, int signo, SigInfo_p siginfo) = 0;
+ };
+
+ template <typename T_Mutex>
+ class BaseFdWatcher : public BaseWatcher
+ {
+ template <typename, typename Traits> friend class EventDispatch;
+ friend class dasync::EventLoop<T_Mutex>;
+
+ protected:
+ int watch_fd;
+ int watch_flags;
+ int event_flags;
+
+ BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }
+
+ public:
+ virtual Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) = 0;
+ };
+
+ template <typename T_Mutex>
+ class BaseBidiFdWatcher : public BaseFdWatcher<T_Mutex>
+ {
+ template <typename, typename Traits> friend class EventDispatch;
+ friend class dasync::EventLoop<T_Mutex>;
+
+ // The main instance is the "input" watcher only; we keep a secondary watcher
+ // with a secondary set of flags for the "output" watcher:
+ BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD);
+
+ // This should never actually get called:
+ Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags)
+ {
+ return Rearm::REARM; // should not be reachable.
+ };
+
+ protected:
+ int read_removed : 1; // read watch removed?
+ int write_removed : 1; // write watch removed?
+
+ public:
+ virtual Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) = 0;
+ virtual Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) = 0;
+ };
+
+ template <typename T_Mutex>
+ class BaseChildWatcher : public BaseWatcher
+ {
+ template <typename, typename Traits> friend class EventDispatch;
+ friend class dasync::EventLoop<T_Mutex>;
+
+ protected:
+ pid_t watch_pid;
+ int child_status;
+
+ BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { }
+
+ public:
+ virtual void gotTermStat(EventLoop<T_Mutex> * eloop, pid_t child, int status) = 0;
+ };
+
+ // Classes for implementing a fair(ish) wait queue.
+ // A queue node can be signalled when it reaches the head of
+ // the queue.
+
+ template <typename T_Mutex> class waitqueue;
+ template <typename T_Mutex> class waitqueue_node;
+
+ // Select an appropriate conditiona variable type for a mutex:
+ // condition_variable if mutex is std::mutex, or condition_variable_any
+ // otherwise.
+ template <class T_Mutex> class condvarSelector;
+
+ template <> class condvarSelector<std::mutex>
+ {
+ public:
+ typedef std::condition_variable condvar;
+ };
+
+ template <class T_Mutex> class condvarSelector
+ {
+ public:
+ typedef std::condition_variable_any condvar;
+ };
+
+ template <> class waitqueue_node<NullMutex>
+ {
+ // Specialised waitqueue_node for NullMutex.
+ // TODO can this be reduced to 0 data members?
+ friend class waitqueue<NullMutex>;
+ waitqueue_node * next = nullptr;
+
+ public:
+ void wait(std::unique_lock<NullMutex> &ul) { }
+ void signal() { }
+ };
+
+ template <typename T_Mutex> class waitqueue_node
+ {
+ typename condvarSelector<T_Mutex>::condvar condvar;
+ friend class waitqueue<T_Mutex>;
+ waitqueue_node * next = nullptr;
+
+ public:
+ void signal()
+ {
+ condvar.notify_one();
+ }
+
+ void wait(std::unique_lock<T_Mutex> &mutex_lock)
+ {
+ condvar.wait(mutex_lock);
+ }
+ };
+
+ template <typename T_Mutex> class waitqueue
+ {
+ waitqueue_node<T_Mutex> * tail = nullptr;
+ waitqueue_node<T_Mutex> * head = nullptr;
+
+ public:
+ waitqueue_node<T_Mutex> * unqueue()
+ {
+ head = head->next;
+ return head;
+ }
+
+ waitqueue_node<T_Mutex> * getHead()
+ {
+ return head;
+ }
+
+ void queue(waitqueue_node<T_Mutex> *node)
+ {
+ if (tail) {
+ tail->next = node;
+ }
+ else {
+ head = node;
+ }
+ }
+ };
+
+ // This class serves as the base class (mixin) for the AEN mechanism class.
+ // Note that EventDispatch, here, and EventLoop (below) are really two sides of one coin;
+ // they do not work independently. The mixin pattern that we use to avoid dynamic dispatch
+ // forces them to be two seperate classes, however.
+ //
+ // The EventDispatch class maintains the queued event data structures. It inserts watchers
+ // into the queue when eventes are received (receiveXXX methods).
+ template <typename T_Mutex, typename Traits> class EventDispatch : public Traits
+ {
+ friend class EventLoop<T_Mutex>;
+
+ // queue data structure/pointer
+ BaseWatcher * first;
+
+ using BaseSignalWatcher = dasync::dprivate::BaseSignalWatcher<T_Mutex>;
+ using BaseFdWatcher = dasync::dprivate::BaseFdWatcher<T_Mutex>;
+ using BaseBidiFdWatcher = dasync::dprivate::BaseBidiFdWatcher<T_Mutex>;
+ using BaseChildWatcher = dasync::dprivate::BaseChildWatcher<T_Mutex>;
+
+ void queueWatcher(BaseWatcher *bwatcher)
+ {
+ // TODO
+ // We can't allow a queued entry to be deleted (due to the single-linked-list used for the queue)
+ // so for now, I'll set it active; but this prevents it being deleted until we can next
+ // process events, so once we have a proper linked list or better structure should probably
+ // remove this:
+ bwatcher->active = true;
+
+ // Put in queue:
+ BaseWatcher * prev_first = first;
+ first = bwatcher;
+ bwatcher->next = prev_first;
+ }
+
+ protected:
+ T_Mutex lock;
+
+ void receiveSignal(typename Traits::SigInfo & siginfo, void * userdata)
+ {
+ BaseSignalWatcher * bwatcher = static_cast<BaseSignalWatcher *>(userdata);
+ bwatcher->siginfo = siginfo;
+ queueWatcher(bwatcher);
+ }
+
+ template <typename T>
+ void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags)
+ {
+ BaseFdWatcher * bfdw = static_cast<BaseFdWatcher *>(userdata);
+
+ bfdw->event_flags |= flags;
+
+ BaseWatcher * bwatcher = bfdw;
+
+ bool is_multi_watch = bfdw->watch_flags & multi_watch;
+ if (is_multi_watch) {
+ BaseBidiFdWatcher *bbdw = static_cast<BaseBidiFdWatcher *>(bwatcher);
+ if (flags & in_events && flags & out_events) {
+ // Queue the secondary watcher first:
+ queueWatcher(&bbdw->outWatcher);
+ }
+ else if (flags & out_events) {
+ // Use the secondary watcher for queueing:
+ bwatcher = &(bbdw->outWatcher);
+ }
+ }
+
+ queueWatcher(bwatcher);
+
+ if (is_multi_watch && bfdw->event_flags != bfdw->watch_flags) {
+ // We need to re-enable the other channel now:
+ loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata,
+ (bfdw->watch_flags & ~(bfdw->event_flags)) | one_shot);
+ }
+ }
+
+ void receiveChildStat(pid_t child, int status, void * userdata)
+ {
+ BaseChildWatcher * watcher = static_cast<BaseChildWatcher *>(userdata);
+ watcher->child_status = status;
+ queueWatcher(watcher);
+ }
+
+ // TODO is this needed?:
+ BaseWatcher * pullEvent()
+ {
+ if (first) {
+ BaseWatcher * r = first;
+ first = first->next;
+ return r;
+ }
+ return nullptr;
+ }
+
+ void issueDelete(BaseWatcher *watcher) noexcept
+ {
+ // This is only called when the attention lock is held, so if the watcher is not
+ // active/queued now, it cannot become active during execution of this function.
+
+ lock.lock();
+
+ if (watcher->active) {
+ // If the watcher is active, set deleteme true; the watcher will be removed
+ // at the end of current processing (i.e. when active is set false).
+ watcher->deleteme = true;
+ }
+ else {
+ // Actually do the delete.
+ watcher->watchRemoved();
+ }
+
+ lock.unlock();
+ }
+ };
+}
+
+
+template <typename T_Mutex> class EventLoop
+{
+ friend class PosixFdWatcher<T_Mutex>;
+ friend class PosixSignalWatcher<T_Mutex>;
+ friend class PosixChildWatcher<T_Mutex>;
+
+ template <typename T, typename U> using EventDispatch = dprivate::EventDispatch<T,U>;
+ template <typename T> using waitqueue = dprivate::waitqueue<T>;
+ template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
+ using BaseWatcher = dprivate::BaseWatcher;
+ using BaseSignalWatcher = dprivate::BaseSignalWatcher<T_Mutex>;
+ using BaseFdWatcher = dprivate::BaseFdWatcher<T_Mutex>;
+ using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher<T_Mutex>;
+ using BaseChildWatcher = dprivate::BaseChildWatcher<T_Mutex>;
+ using WatchType = dprivate::WatchType;
+
+ EpollLoop<ChildProcEvents<EventDispatch<T_Mutex, EpollTraits>>> loop_mech;
+
+ // There is a complex problem with most asynchronous event notification mechanisms
+ // when used in a multi-threaded environment. Generally, a file descriptor or other
+ // event type that we are watching will be associated with some data used to manage
+ // that event source. For example a web server needs to maintain information about
+ // each client connection, such as the state of the connection (what protocol version
+ // has been negotiated, etc; if a transfer is taking place, what file is being
+ // transferred etc).
+ //
+ // However, sometimes we want to remove an event source (eg webserver wants to drop
+ // a connection) and delete the associated data. The problem here is that it is
+ // difficult to be sure when it is ok to actually remove the data, since when
+ // requesting to unwatch the source in one thread it is still possible that an
+ // event from that source is just being reported to another thread (in which case
+ // the data will be needed).
+ //
+ // To solve that, we:
+ // - allow only one thread to poll for events at a time, using a lock
+ // - use the same lock to prevent polling, if we want to unwatch an event source
+ // - generate an event to interrupt any polling that may already be occurring in
+ // another thread
+ // - mark handlers as active if they are currently executing, and
+ // - when removing an active handler, simply set a flag which causes it to be
+ // removed once the current processing is finished, rather than removing it
+ // immediately.
+ //
+ // In particular the lock mechanism for preventing multiple threads polling and
+ // for allowing polling to be interrupted is tricky. We can't use a simple mutex
+ // since there is significant chance that it will be highly contended and there
+ // are no guarantees that its acquisition will be fair. In particular, we don't
+ // want a thread that is trying to unwatch a source being starved while another
+ // thread polls the event source.
+ //
+ // So, we use two wait queues protected by a single mutex. The "attn_waitqueue"
+ // (attention queue) is the high-priority queue, used for threads wanting to
+ // unwatch event sources. The "wait_waitquueue" is the queue used by threads
+ // that wish to actually poll for events.
+ // - The head of the "attn_waitqueue" is always the holder of the lock
+ // - Therefore, a poll-waiter must be moved from the wait_waitqueue to the
+ // attn_waitqueue to actually gain the lock. This is only done if the
+ // attn_waitqueue is otherwise empty.
+ // - The mutex only protects manipulation of the wait queues, and so should not
+ // be highly contended.
+
+ T_Mutex wait_lock; // wait lock, used to prevent multiple threads from waiting
+ // on the event queue simultaneously.
+ waitqueue<T_Mutex> attn_waitqueue;
+ waitqueue<T_Mutex> wait_waitqueue;
+
+
+ void registerSignal(BaseSignalWatcher *callBack, int signo)
+ {
+ loop_mech.addSignalWatch(signo, callBack);
+ }
+
+ void deregister(BaseSignalWatcher *callBack, int signo) noexcept
+ {
+ loop_mech.removeSignalWatch(signo);
+
+ waitqueue_node<T_Mutex> qnode;
+ getAttnLock(qnode);
+
+ EventDispatch<T_Mutex, EpollTraits> & ed = (EventDispatch<T_Mutex, EpollTraits> &) loop_mech;
+ ed.issueDelete(callBack);
+
+ releaseLock(qnode);
+ }
+
+ void registerFd(BaseFdWatcher *callback, int fd, int eventmask)
+ {
+ loop_mech.addFdWatch(fd, callback, eventmask);
+ }
+
+ void deregister(BaseFdWatcher *callback, int fd)
+ {
+ loop_mech.removeFdWatch(fd);
+
+ waitqueue_node<T_Mutex> qnode;
+ getAttnLock(qnode);
+
+ EventDispatch<T_Mutex, EpollTraits> & ed = (EventDispatch<T_Mutex, EpollTraits> &) loop_mech;
+ ed.issueDelete(callback);
+
+ releaseLock(qnode);
+ }
+
+ void reserveChildWatch(BaseChildWatcher *callBack)
+ {
+ loop_mech.addSignalWatch(SIGCHLD, nullptr); // TODO remove this kludge
+ loop_mech.reserveChildWatch();
+ }
+
+ void registerChild(BaseChildWatcher *callBack, pid_t child)
+ {
+ loop_mech.addSignalWatch(SIGCHLD, nullptr); // TODO remove this kludge
+ loop_mech.addChildWatch(child, callBack);
+ }
+
+ void registerReservedChild(BaseChildWatcher *callBack, pid_t child) noexcept
+ {
+ loop_mech.addReservedChildWatch(child, callBack);
+ }
+
+ // Acquire the attention lock (when held, ensures that no thread is polling the AEN
+ // mechanism).
+ void getAttnLock(waitqueue_node<T_Mutex> &qnode)
+ {
+ std::unique_lock<T_Mutex> ulock(wait_lock);
+ attn_waitqueue.queue(&qnode);
+ if (attn_waitqueue.getHead() != &qnode) {
+ loop_mech.interruptWait();
+ while (attn_waitqueue.getHead() != &qnode) {
+ qnode.wait(ulock);
+ }
+ }
+ }
+
+ // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower
+ // priority than the attention lock).
+ void getPollwaitLock(waitqueue_node<T_Mutex> &qnode)
+ {
+ std::unique_lock<T_Mutex> ulock(wait_lock);
+ if (attn_waitqueue.getHead() == nullptr) {
+ // Queue is completely empty:
+ attn_waitqueue.queue(&qnode);
+ }
+ else {
+ wait_waitqueue.queue(&qnode);
+ }
+
+ while (attn_waitqueue.getHead() != &qnode) {
+ qnode.wait(ulock);
+ }
+ }
+
+ // Release the poll-wait/attention lock.
+ void releaseLock(waitqueue_node<T_Mutex> &qnode)
+ {
+ std::unique_lock<T_Mutex> ulock(wait_lock);
+ waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
+ if (nhead != nullptr) {
+ nhead->signal();
+ }
+ else {
+ nhead = wait_waitqueue.getHead();
+ if (nhead != nullptr) {
+ attn_waitqueue.queue(nhead);
+ nhead->signal();
+ }
+ }
+ }
+
+ void processSignalRearm(BaseSignalWatcher * bsw, Rearm rearmType)
+ {
+ // Called with lock held
+ if (rearmType == Rearm::REARM) {
+ loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo());
+ }
+ else if (rearmType == Rearm::REMOVE) {
+ loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo());
+ }
+ }
+
+ Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch)
+ {
+ // Called with lock held
+ if (is_multi_watch) {
+ BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
+
+ if (rearmType == Rearm::REMOVE) {
+ bdfw->read_removed = 1;
+ bdfw->watch_flags &= ~in_events;
+ if (! bdfw->write_removed) {
+ return Rearm::NOOP;
+ }
+ else {
+ // both removed: actually remove
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd);
+ }
+ }
+ else if (rearmType == Rearm::DISARM) {
+ // Nothing more to do
+ }
+ else if (rearmType == Rearm::REARM) {
+ bdfw->watch_flags |= in_events;
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ (bdfw->watch_flags & (in_events | out_events)) | one_shot);
+ }
+ return rearmType;
+ }
+ else {
+ if (rearmType == Rearm::REARM) {
+ loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw,
+ (bfw->watch_flags & (in_events | out_events)) | one_shot);
+ }
+ else if (rearmType == Rearm::REMOVE) {
+ loop_mech.removeFdWatch_nolock(bfw->watch_fd);
+ }
+ return rearmType;
+ }
+ }
+
+ Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType)
+ {
+ // Called with lock held
+ if (rearmType == Rearm::REMOVE) {
+ bdfw->write_removed = 1;
+ bdfw->watch_flags &= ~out_events;
+ if (! bdfw->read_removed) {
+ return Rearm::NOOP;
+ }
+ else {
+ // both removed: actually remove
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd);
+ }
+ }
+ else if (rearmType == Rearm::DISARM) {
+ // Nothing more to do
+ }
+ else if (rearmType == Rearm::REARM) {
+ bdfw->watch_flags |= out_events;
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ (bdfw->watch_flags & (in_events | out_events)) | one_shot);
+ }
+ return rearmType;
+ }
+
+ bool processEvents() noexcept
+ {
+ EventDispatch<T_Mutex, EpollTraits> & ed = (EventDispatch<T_Mutex, EpollTraits> &) loop_mech;
+ ed.lock.lock();
+
+ // So this pulls *all* currently pending events and processes them in the current thread.
+ // That's probably good for throughput, but maybe the behavior should be configurable.
+
+ BaseWatcher * pqueue = ed.first;
+ ed.first = nullptr;
+ bool active = false;
+
+ BaseWatcher * prev = nullptr;
+ for (BaseWatcher * q = pqueue; q != nullptr; q = q->next) {
+ if (q->deleteme) {
+ q->watchRemoved();
+ if (prev) {
+ prev->next = q->next;
+ }
+ else {
+ pqueue = q->next;
+ }
+ }
+ else {
+ q->active = true;
+ active = true;
+ }
+ }
+
+ ed.lock.unlock();
+
+ while (pqueue != nullptr) {
+ Rearm rearmType = Rearm::NOOP;
+ bool is_multi_watch = false;
+ BaseBidiFdWatcher *bbfw = nullptr;
+ // (Above variables are initialised only to silence compiler warnings).
+
+ // Note that we select actions based on the type of the watch, as determined by the watchType
+ // member. In some ways this screams out for polmorphism; a virtual function could be overridden
+ // by each of the watcher types. I've instead used switch/case because I think it will perform
+ // slightly better without the overhead of a virtual function dispatch, but it's got to be a
+ // close call; I might be guilty of premature optimisation here.
+
+ switch (pqueue->watchType) {
+ case WatchType::SIGNAL: {
+ BaseSignalWatcher *bsw = static_cast<BaseSignalWatcher *>(pqueue);
+ rearmType = bsw->gotSignal(this, bsw->siginfo.get_signo(), bsw->siginfo);
+ break;
+ }
+ case WatchType::FD: {
+ BaseFdWatcher *bfw = static_cast<BaseFdWatcher *>(pqueue);
+ is_multi_watch = bfw->watch_flags & dprivate::multi_watch;
+ if (is_multi_watch) {
+ // The primary watcher for a multi-watch watcher is queued for
+ // read events.
+ BaseBidiFdWatcher *bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
+ rearmType = bdfw->readReady(this, bfw->watch_fd);
+ }
+ else {
+ rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags);
+ }
+ break;
+ }
+ case WatchType::CHILD: {
+ BaseChildWatcher *bcw = static_cast<BaseChildWatcher *>(pqueue);
+ bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status);
+ // Child watches automatically remove:
+ rearmType = Rearm::REMOVE;
+ break;
+ }
+ case WatchType::SECONDARYFD: {
+ // first construct a pointer to the main watcher:
+ char * rp = (char *)pqueue;
+ rp -= offsetof(BaseBidiFdWatcher, outWatcher);
+ bbfw = (BaseBidiFdWatcher *)rp;
+ rearmType = bbfw->writeReady(this, bbfw->watch_fd);
+ break;
+ }
+ default: ;
+ }
+
+ ed.lock.lock();
+
+ pqueue->active = false;
+ if (pqueue->deleteme) {
+ // We don't want a watch that is marked "deleteme" to re-arm itself.
+ // NOOP flags that the state is managed externally, so we don't adjust that.
+ if (rearmType != Rearm::NOOP) {
+ rearmType = Rearm::REMOVE;
+ }
+ }
+ switch (pqueue->watchType) {
+ case WatchType::SIGNAL:
+ processSignalRearm(static_cast<BaseSignalWatcher *>(pqueue), rearmType);
+ break;
+ case WatchType::FD:
+ rearmType = processFdRearm(static_cast<BaseFdWatcher *>(pqueue), rearmType, is_multi_watch);
+ break;
+ case WatchType::SECONDARYFD:
+ processSecondaryRearm(bbfw, rearmType);
+ break;
+ default: ;
+ }
+
+ if (pqueue->deleteme) rearmType = Rearm::REMOVE; // makes the watchRemoved() callback get called.
+
+ ed.lock.unlock();
+
+ if (rearmType == Rearm::REMOVE) {
+ pqueue->watchRemoved();
+ }
+
+ pqueue = pqueue->next;
+ }
+
+ return active;
+ }
+
+
+ public:
+ void run() noexcept
+ {
+ while (! processEvents()) {
+ waitqueue_node<T_Mutex> qnode;
+
+ // We only allow one thread to poll the mechanism at any time, since otherwise
+ // removing event watchers is a nightmare beyond comprehension.
+ getPollwaitLock(qnode);
+
+ // Pull events from the AEN mechanism and insert them in our internal queue:
+ loop_mech.pullEvents(true);
+
+ // Now release the wait lock:
+ releaseLock(qnode);
+ }
+ }
+};
+
+
+typedef EventLoop<NullMutex> NEventLoop;
+typedef EventLoop<std::mutex> TEventLoop;
+
+// from dasync.cc:
+TEventLoop & getSystemLoop();
+
+// Posix signal event watcher
+template <typename T_Mutex>
+class PosixSignalWatcher : private dprivate::BaseSignalWatcher<T_Mutex>
+{
+public:
+ using SigInfo_p = typename dprivate::BaseSignalWatcher<T_Mutex>::SigInfo_p;
+
+ // Register this watcher to watch the specified signal.
+ // If an attempt is made to register with more than one event loop at
+ // a time, behaviour is undefined.
+ inline void registerWatch(EventLoop<T_Mutex> *eloop, int signo)
+ {
+ this->deleteme = false;
+ this->siginfo.set_signo(signo);
+ eloop->registerSignal(this, signo);
+ }
+
+ inline void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
+ {
+ eloop->deregister(this, this->siginfo.get_signo());
+ }
+
+ // virtual Rearm gotSignal(EventLoop<T_Mutex> *, int signo, SigInfo_p info) = 0;
+};
+
+// Posix file descriptor event watcher
+template <typename T_Mutex>
+class PosixFdWatcher : private dprivate::BaseFdWatcher<T_Mutex>
+{
+ protected:
+
+ // Set the types of event to watch. May not be supported for all mechanisms.
+ // Only safe to call from within the callback handler (gotEvent).
+ void setWatchFlags(int newFlags)
+ {
+ this->watch_flags = newFlags;
+ }
+
+ public:
+
+ void registerWith(EventLoop<T_Mutex> *eloop, int fd, int flags)
+ {
+ this->deleteme = false;
+ this->watch_fd = fd;
+ this->watch_flags = flags;
+ eloop->registerFd(this, fd, flags);
+ }
+
+ void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
+ {
+ eloop->deregister(this, this->watch_fd);
+ }
+
+ // virtual Rearm gotEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
+};
+
+// Posix child process event watcher
+template <typename T_Mutex>
+class PosixChildWatcher : private dprivate::BaseChildWatcher<T_Mutex>
+{
+ public:
+ void reserveWith(EventLoop<T_Mutex> *eloop)
+ {
+ eloop->reserveChildWatch();
+ }
+
+ void registerWith(EventLoop<T_Mutex> *eloop, pid_t child)
+ {
+ this->deleteme = false;
+ this->watch_pid = child;
+ eloop->registerChild(this, child);
+ }
+
+ void registerReserved(EventLoop<T_Mutex> *eloop, pid_t child) noexcept
+ {
+ eloop->registerReservedChild(this, child);
+ }
+
+ // virtual void gotTermStat(EventLoop<T_Mutex> *, pid_t child, int status) = 0;
+};
+
+} // namespace dasync
+
+#endif
--- /dev/null
+#ifndef D_MUTEX_H_INCLUDED
+#define D_MUTEX_H_INCLUDED
+
+//#include <pthread.h>
+#include <mutex>
+
+namespace dasync {
+
+// Simple non-recursive mutex, with priority inheritance to avoid priority inversion.
+/*
+class DMutex
+{
+ private:
+ pthread_mutex_t mutex;
+
+ public:
+ DMutex()
+ {
+ // Avoid priority inversion by using PTHREAD_PRIO_INHERIT
+ pthread_mutexattr_t attribs;
+ pthread_mutexattr_init(&attribs);
+ pthread_mutexattr_setprotocol(&attribs, PTHREAD_PRIO_INHERIT);
+ pthread_mutex_init(&mutex, &attribs);
+ }
+
+ void lock()
+ {
+ pthread_mutex_lock(&mutex);
+ }
+
+ void unlock()
+ {
+ pthread_mutex_unlock(&mutex);
+ }
+};
+*/
+
+using DMutex = std::mutex;
+
+// A "null" mutex, for which locking / unlocking actually does nothing.
+class NullMutex
+{
+ #ifdef __GNUC__
+ #ifndef __clang__
+ char empty[0]; // Make class instances take up no space (gcc)
+ #else
+ char empty[0] __attribute__((unused)); // Make class instances take up no space (clang)
+ #endif
+ #endif
+
+ public:
+ void lock() { }
+ void unlock() { }
+ void try_lock() { }
+};
+
+
+} // end of namespace
+
+
+#endif
#include <iostream>
#include <algorithm>
-#include <ev.h>
#include <unistd.h>
#include <fcntl.h>
+#include "dasync.h"
+
#include "service.h"
#include "dinit-log.h"
#include "cpbuffer.h"
+extern EventLoop_t eventLoop;
+
LogLevel log_level = LogLevel::WARN;
LogLevel cons_log_level = LogLevel::WARN;
static bool log_to_console = false; // whether we should output log messages to
static ServiceSet *service_set = nullptr; // Reference to service set
-static void log_conn_callback(struct ev_loop *loop, struct ev_io *w, int revents) noexcept;
+namespace {
+ class BufferedLogStream;
+}
-class BufferedLogStream
+// TODO just make this the callback, directly.
+static Rearm log_conn_callback(EventLoop_t *loop, BufferedLogStream *w, int revents) noexcept;
+
+namespace {
+class BufferedLogStream : public PosixFdWatcher<NullMutex>
{
public:
CPBuffer<4096> log_buffer;
- struct ev_io eviocb;
-
// Outgoing:
bool partway = false; // if we are partway throught output of a log message
bool discarded = false; // if we have discarded a message
bool special = false; // currently outputting special message?
char *special_buf; // buffer containing special message
int msg_index; // index into special message
+
+ int fd;
void init(int fd)
{
- ev_io_init(&eviocb, log_conn_callback, fd, EV_WRITE);
- eviocb.data = this;
+ //ev_io_init(&eviocb, log_conn_callback, fd, EV_WRITE);
+ //eviocb.data = this;
+ this->fd = fd;
+ }
+
+ Rearm gotEvent(EventLoop_t *loop, int fd, int flags) noexcept override
+ {
+ return log_conn_callback(loop, this, flags);
}
};
+}
// Two log streams:
// (One for main log, one for console)
static void release_console()
{
- ev_io_stop(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+ //ev_io_stop(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+ //log_stream[DLOG_CONS].deregisterWatch(&eventLoop); // now handled elsewhere
if (! log_to_console) {
int flags = fcntl(1, F_GETFL, 0);
fcntl(1, F_SETFL, flags & ~O_NONBLOCK);
}
}
-static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noexcept
+static Rearm log_conn_callback(EventLoop_t * loop, BufferedLogStream * w, int revents) noexcept
{
- auto &log_stream = *static_cast<BufferedLogStream *>(w->data);
+ auto &log_stream = *w;
if (log_stream.special) {
char * start = log_stream.special_buf + log_stream.msg_index;
}
else {
log_stream.msg_index += r;
- return;
+ return Rearm::REARM;
}
}
else {
// other error?
// TODO
}
- return;
+ return Rearm::REARM;
}
else {
// Writing from the regular circular buffer
if (log_stream.current_index == 0) {
release_console();
- return;
+ return Rearm::REMOVE;
}
char *ptr = log_stream.log_buffer.get_ptr(0);
if (log_stream.current_index == 0 || !log_to_console) {
// No more messages buffered / stop logging to console:
release_console();
+ return Rearm::REMOVE;
}
}
}
// TODO
// EAGAIN / EWOULDBLOCK?
// error?
- return;
+ return Rearm::REARM;
}
}
// We've written something by the time we get here. We could fall through to below, but
// let's give other events a chance to be processed by returning now.
- return;
+ return Rearm::REARM;
}
void init_log(ServiceSet *sset) noexcept
//ev_io_init(& log_stream[DLOG_CONS].eviocb, log_conn_callback, 1, EV_WRITE);
log_stream[DLOG_CONS].init(STDOUT_FILENO);
if (log_stream[DLOG_CONS].current_index > 0) {
- ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+ //ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+ log_stream[DLOG_CONS].registerWith(&eventLoop, log_stream[DLOG_CONS].fd, out_events);
}
log_to_console = true;
}
if (log_stream[DLOG_CONS].current_index > 0) {
// Try to flush any messages that are currently buffered. (Console is non-blocking
// so it will fail gracefully).
- log_conn_callback(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb, EV_WRITE);
+ log_conn_callback(&eventLoop, &log_stream[DLOG_CONS], out_events);
}
else {
release_console();
+ log_stream[DLOG_CONS].deregisterWatch(&eventLoop);
}
}
// (if we're partway through logging a message, we release the console when
bool was_first = (log_stream[DLOG_CONS].current_index == 0);
log_stream[DLOG_CONS].current_index += amount;
if (was_first && log_to_console) {
- ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+ //ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+ log_stream[DLOG_CONS].registerWith(&eventLoop, log_stream[DLOG_CONS].fd, out_events);
}
}
else {
#include <fcntl.h>
#include <pwd.h>
+#include "dasync.h"
#include "service.h"
-#include "ev++.h"
#include "control.h"
#include "dinit-log.h"
*/
-static void sigint_reboot_cb(struct ev_loop *loop, ev_signal *w, int revents);
-static void sigquit_cb(struct ev_loop *loop, ev_signal *w, int revents);
-static void sigterm_cb(struct ev_loop *loop, ev_signal *w, int revents);
-void open_control_socket(struct ev_loop *loop) noexcept;
-void close_control_socket(struct ev_loop *loop) noexcept;
+using namespace dasync;
+using EventLoop_t = EventLoop<NullMutex>;
-struct ev_io control_socket_io;
+EventLoop_t eventLoop = EventLoop_t();
+
+// TODO remove:
+//static void sigint_reboot_cb(struct ev_loop *loop, ev_signal *w, int revents);
+//static void sigquit_cb(struct ev_loop *loop, ev_signal *w, int revents);
+//static void sigterm_cb(struct ev_loop *loop, ev_signal *w, int revents);
+static void sigint_reboot_cb(EventLoop_t *eloop) noexcept;
+static void sigquit_cb(EventLoop_t *eloop) noexcept;
+static void sigterm_cb(EventLoop_t *eloop) noexcept;
+void open_control_socket(EventLoop_t *loop) noexcept;
+void close_control_socket(EventLoop_t *loop) noexcept;
+
+static void control_socket_cb(EventLoop_t *loop, int fd);
+
+class ControlSocketWatcher : public PosixFdWatcher<NullMutex>
+{
+ Rearm gotEvent(EventLoop_t * loop, int fd, int flags)
+ {
+ control_socket_cb(loop, fd);
+ return Rearm::REARM;
+ }
+
+ public:
+ // TODO the fd is already stored, must we really store it again...
+ int fd;
+
+ void registerWith(EventLoop_t * loop, int fd, int flags)
+ {
+ this->fd = fd;
+ PosixFdWatcher<NullMutex>::registerWith(loop, fd, flags);
+ }
+};
+
+ControlSocketWatcher control_socket_io;
// Variables
}
+namespace {
+ class CallbackSignalHandler : public PosixSignalWatcher<NullMutex>
+ {
+ public:
+ typedef void (*cb_func_t)(EventLoop_t *);
+
+ private:
+ cb_func_t cb_func;
+
+ public:
+ CallbackSignalHandler() : cb_func(nullptr) { }
+ CallbackSignalHandler(cb_func_t pcb_func) : cb_func(pcb_func) { }
+
+ void setCbFunc(cb_func_t cb_func)
+ {
+ this->cb_func = cb_func;
+ }
+
+ Rearm gotSignal(EventLoop_t * eloop, int signo, SigInfo_p siginfo) override
+ {
+ service_set->stop_all_services(ShutdownType::REBOOT);
+ return Rearm::REARM;
+ }
+ };
+
+ class ControlSocketWatcher : public PosixFdWatcher<NullMutex>
+ {
+ Rearm gotEvent(EventLoop_t * loop, int fd, int flags)
+ {
+ control_socket_cb(loop, fd);
+ return Rearm::REARM;
+ }
+ };
+}
+
int main(int argc, char **argv)
{
using namespace std;
/* Set up signal handlers etc */
/* SIG_CHILD is ignored by default: good */
- /* sigemptyset(&sigwait_set); */
- /* sigaddset(&sigwait_set, SIGCHLD); */
- /* sigaddset(&sigwait_set, SIGINT); */
- /* sigaddset(&sigwait_set, SIGTERM); */
- /* sigprocmask(SIG_BLOCK, &sigwait_set, NULL); */
+ sigset_t sigwait_set;
+ sigemptyset(&sigwait_set);
+ sigaddset(&sigwait_set, SIGCHLD);
+ sigaddset(&sigwait_set, SIGINT);
+ sigaddset(&sigwait_set, SIGTERM);
+ sigprocmask(SIG_BLOCK, &sigwait_set, NULL);
// Terminal access control signals - we block these so that dinit can't be
// suspended if it writes to the terminal after some other process has claimed
}
// Set up signal handlers
- ev_signal sigint_ev_signal;
+ //ev_signal sigint_ev_signal;
+ CallbackSignalHandler sigint_watcher;
if (am_system_init) {
- ev_signal_init(&sigint_ev_signal, sigint_reboot_cb, SIGINT);
+ //ev_signal_init(&sigint_ev_signal, sigint_reboot_cb, SIGINT);
+ sigint_watcher.setCbFunc(sigint_reboot_cb);
}
else {
- ev_signal_init(&sigint_ev_signal, sigterm_cb, SIGINT);
+ //ev_signal_init(&sigint_ev_signal, sigterm_cb, SIGINT);
+ sigint_watcher.setCbFunc(sigterm_cb);
}
- ev_signal sigquit_ev_signal;
+ //ev_signal sigquit_ev_signal;
+ CallbackSignalHandler sigquit_watcher;
if (am_system_init) {
// PID 1: SIGQUIT exec's shutdown
- ev_signal_init(&sigquit_ev_signal, sigquit_cb, SIGQUIT);
+ //ev_signal_init(&sigquit_ev_signal, sigquit_cb, SIGQUIT);
+ sigquit_watcher.setCbFunc(sigquit_cb);
}
else {
// Otherwise: SIGQUIT terminates dinit
- ev_signal_init(&sigquit_ev_signal, sigterm_cb, SIGQUIT);
+ //ev_signal_init(&sigquit_ev_signal, sigterm_cb, SIGQUIT);
+ sigquit_watcher.setCbFunc(sigterm_cb);
}
- ev_signal sigterm_ev_signal;
- ev_signal_init(&sigterm_ev_signal, sigterm_cb, SIGTERM);
+ //ev_signal sigterm_ev_signal;
+ //ev_signal_init(&sigterm_ev_signal, sigterm_cb, SIGTERM);
+ auto sigterm_watcher = CallbackSignalHandler(sigterm_cb);
/* Set up libev */
- struct ev_loop *loop = ev_default_loop(EVFLAG_AUTO /* | EVFLAG_SIGNALFD */);
- ev_signal_start(loop, &sigint_ev_signal);
- ev_signal_start(loop, &sigquit_ev_signal);
- ev_signal_start(loop, &sigterm_ev_signal);
+ //struct ev_loop *loop = ev_default_loop(EVFLAG_AUTO /* | EVFLAG_SIGNALFD */);
+ //ev_signal_start(loop, &sigint_ev_signal);
+ //ev_signal_start(loop, &sigquit_ev_signal);
+ //ev_signal_start(loop, &sigterm_ev_signal);
+ sigint_watcher.registerWatch(&eventLoop, SIGINT);
+ sigquit_watcher.registerWatch(&eventLoop, SIGQUIT);
+ sigterm_watcher.registerWatch(&eventLoop, SIGTERM);
// Try to open control socket (may fail due to readonly filesystem)
- open_control_socket(loop);
+ open_control_socket(&eventLoop);
#ifdef __linux__
if (am_system_init) {
// Process events until all services have terminated.
while (service_set->count_active_services() != 0) {
- ev_loop(loop, EVLOOP_ONESHOT);
+ // ev_loop(loop, EVLOOP_ONESHOT);
+ eventLoop.run();
}
ShutdownType shutdown_type = service_set->getShutdownType();
}
}
- close_control_socket(ev_default_loop(EVFLAG_AUTO));
+ close_control_socket(&eventLoop);
if (am_system_init) {
if (shutdown_type == ShutdownType::CONTINUE) {
// PID 1 must not actually exit, although we should never reach this point:
while (true) {
- ev_loop(loop, EVLOOP_ONESHOT);
+ // ev_loop(loop, EVLOOP_ONESHOT);
+ eventLoop.run();
}
}
}
// Callback for control socket
-static void control_socket_cb(struct ev_loop *loop, ev_io *w, int revents)
+static void control_socket_cb(EventLoop_t *loop, int sockfd)
{
// TODO limit the number of active connections. Keep a tally, and disable the
// control connection listening socket watcher if it gets high, and re-enable
// it once it falls below the maximum.
// Accept a connection
- int sockfd = w->fd;
-
int newfd = accept4(sockfd, nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (newfd != -1) {
}
}
-void open_control_socket(struct ev_loop *loop) noexcept
+void open_control_socket(EventLoop_t *loop) noexcept
{
if (! control_socket_open) {
const char * saddrname = control_socket_path;
}
control_socket_open = true;
- ev_io_init(&control_socket_io, control_socket_cb, sockfd, EV_READ);
- ev_io_start(loop, &control_socket_io);
+ //ev_io_init(&control_socket_io, control_socket_cb, sockfd, EV_READ);
+ //ev_io_start(loop, &control_socket_io);
+ control_socket_io.registerWith(&eventLoop, sockfd, in_events);
}
}
-void close_control_socket(struct ev_loop *loop) noexcept
+void close_control_socket(EventLoop_t *loop) noexcept
{
if (control_socket_open) {
int fd = control_socket_io.fd;
- ev_io_stop(loop, &control_socket_io);
+ //ev_io_stop(loop, &control_socket_io);
+ control_socket_io.deregisterWatch(&eventLoop);
close(fd);
// Unlink the socket:
}
/* handle SIGINT signal (generated by kernel when ctrl+alt+del pressed) */
-static void sigint_reboot_cb(struct ev_loop *loop, ev_signal *w, int revents)
+static void sigint_reboot_cb(EventLoop_t *eloop) noexcept
{
service_set->stop_all_services(ShutdownType::REBOOT);
}
/* handle SIGQUIT (if we are system init) */
-static void sigquit_cb(struct ev_loop *loop, ev_signal *w, int revents)
+static void sigquit_cb(EventLoop_t *eloop) noexcept
{
// This allows remounting the filesystem read-only if the dinit binary has been
// unlinked. In that case the kernel holds the binary open, so that it can't be
// properly removed.
- close_control_socket(ev_default_loop(EVFLAG_AUTO));
+ close_control_socket(eloop);
execl("/sbin/shutdown", "/sbin/shutdown", (char *) 0);
log(LogLevel::ERROR, "Error executing /sbin/shutdown: ", strerror(errno));
}
/* handle SIGTERM/SIGQUIT - stop all services (not used for system daemon) */
-static void sigterm_cb(struct ev_loop *loop, ev_signal *w, int revents)
+static void sigterm_cb(EventLoop_t *eloop) noexcept
{
service_set->stop_all_services();
}
#include "dinit-log.h"
// from dinit.cc:
-void open_control_socket(struct ev_loop *loop) noexcept;
-
+void open_control_socket(EventLoop_t *loop) noexcept;
+extern EventLoop_t eventLoop;
// Find the requested service by name
static ServiceRecord * findService(const std::list<ServiceRecord *> & records,
}
}
-void ServiceRecord::process_child_callback(struct ev_loop *loop, ev_child *w, int revents) noexcept
+void ServiceChildWatcher::gotTermStat(EventLoop_t * loop, pid_t child, int status) noexcept
{
- ServiceRecord *sr = (ServiceRecord *) w->data;
-
+ ServiceRecord *sr = service;
+
sr->pid = -1;
- sr->exit_status = w->rstatus;
- ev_child_stop(loop, w);
+ sr->exit_status = status;
// Ok, for a process service, any process death which we didn't rig
// ourselves is a bit... unexpected. Probably, the child died because
return;
}
- sr->handle_exit_status();
+ sr->handle_exit_status();
}
bool ServiceRecord::do_auto_restart() noexcept
}
}
-void ServiceRecord::process_child_status(struct ev_loop *loop, ev_io * stat_io, int revents) noexcept
+Rearm ServiceIoWatcher::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept
{
- ServiceRecord *sr = (ServiceRecord *) stat_io->data;
+ ServiceRecord::process_child_status(loop, this, flags);
+ return Rearm::NOOP;
+}
+
+// TODO remove unused revents param
+void ServiceRecord::process_child_status(EventLoop_t *loop, ServiceIoWatcher * stat_io, int revents) noexcept
+{
+ ServiceRecord *sr = stat_io->service;
sr->waiting_for_execstat = false;
int exec_status;
int r = read(stat_io->fd, &exec_status, sizeof(int));
close(stat_io->fd);
- ev_io_stop(loop, stat_io);
+ stat_io->deregisterWatch(loop);
if (r != 0) {
// We read an errno code; exec() failed, and the service startup failed.
sr->failed_to_start();
}
else if (sr->service_state == ServiceState::STOPPING) {
- // Must be a scripted servce. We've logged the failure, but it's probably better
+ // Must be a scripted service. We've logged the failure, but it's probably better
// not to leave the service in STARTED state:
sr->stopped();
}
if (r > 0) {
pidbuf[r] = 0; // store nul terminator
pid = std::atoi(pidbuf);
- if (kill(pid, 0) == 0) {
- ev_child_init(&child_listener, process_child_callback, pid, 0);
- child_listener.data = this;
- ev_child_start(ev_default_loop(EVFLAG_AUTO), &child_listener);
+ if (kill(pid, 0) == 0) {
+ //ev_child_init(&child_listener, process_child_callback, pid, 0);
+ //child_listener.data = this;
+ //ev_child_start(ev_default_loop(EVFLAG_AUTO), &child_listener);
+ child_listener.registerWith(&eventLoop, pid);
}
else {
log(LogLevel::ERROR, service_name, ": pid read from pidfile (", pid, ") is not valid");
notifyListeners(ServiceEvent::STARTED);
if (onstart_flags.rw_ready) {
- open_control_socket(ev_default_loop(EVFLAG_AUTO));
+ open_control_socket(&eventLoop);
}
if (force_stop || desired_state == ServiceState::STOPPED) {
if (forkpid == 0) {
// Child process. Must not allocate memory (or otherwise risk throwing any exception)
// from here until exit().
- ev_default_destroy(); // won't need that on this side, free up fds.
+ // TODO: we may need an equivalent for the following:
+ // ev_default_destroy(); // won't need that on this side, free up fds.
+
+ // Unmask signals that we masked on startup:
+ sigset_t sigwait_set;
+ sigemptyset(&sigwait_set);
+ sigaddset(&sigwait_set, SIGCHLD);
+ sigaddset(&sigwait_set, SIGINT);
+ sigaddset(&sigwait_set, SIGTERM);
+ sigprocmask(SIG_UNBLOCK, &sigwait_set, NULL);
constexpr int bufsz = ((CHAR_BIT * sizeof(pid_t) - 1) / 3 + 2) + 11;
// "LISTEN_PID=" - 11 characters
pid = forkpid;
// Listen for status
- ev_io_init(&child_status_listener, process_child_status, pipefd[0], EV_READ);
- child_status_listener.data = this;
- ev_io_start(ev_default_loop(EVFLAG_AUTO), &child_status_listener);
+ // TODO should set this up earlier so we can handle failure case (exception)
+ child_status_listener.registerWith(&eventLoop, pipefd[0], in_events);
// Add a process listener so we can detect when the
// service stops
- ev_child_init(&child_listener, process_child_callback, pid, 0);
- child_listener.data = this;
- ev_child_start(ev_default_loop(EVFLAG_AUTO), &child_listener);
+ // TODO should reserve listener, handle exceptions
+ child_listener.registerWith(&eventLoop, pid);
waiting_for_execstat = true;
return true;
}
#include <vector>
#include <csignal>
#include <unordered_set>
-#include "ev.h"
+
+#include "dasync.h"
+
#include "control.h"
#include "service-listener.h"
#include "service-constants.h"
return r;
}
+class ServiceChildWatcher : public PosixChildWatcher<NullMutex>
+{
+ public:
+ // TODO resolve clunkiness of storing this field
+ ServiceRecord * service;
+ void gotTermStat(EventLoop_t * eloop, pid_t child, int status) noexcept;
+
+ ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { }
+};
+
+class ServiceIoWatcher : public PosixFdWatcher<NullMutex>
+{
+ public:
+ // TODO resolve clunkiness of storing these fields
+ int fd;
+ ServiceRecord * service;
+ Rearm gotEvent(EventLoop_t * eloop, int fd, int flags) noexcept;
+
+ ServiceIoWatcher(ServiceRecord * sr) noexcept : service(sr) { }
+
+ void registerWith(EventLoop_t *loop, int fd, int flags)
+ {
+ this->fd = fd;
+ PosixFdWatcher<NullMutex>::registerWith(loop, fd, flags);
+ }
+};
class ServiceRecord
{
+ friend class ServiceChildWatcher;
+ friend class ServiceIoWatcher;
+
typedef std::string string;
string service_name;
int socket_fd = -1; // For socket-activation services, this is the file
// descriptor for the socket.
- ev_child child_listener;
- ev_io child_status_listener;
+ ServiceChildWatcher child_listener;
+ ServiceIoWatcher child_status_listener;
// All dependents have stopped.
void allDepsStopped();
bool start_ps_process(const std::vector<const char *> &args, bool on_console) noexcept;
// Callback from libev when a child process dies
- static void process_child_callback(struct ev_loop *loop, struct ev_child *w,
+ static void process_child_callback(EventLoop_t *loop, ServiceChildWatcher *w,
int revents) noexcept;
- static void process_child_status(struct ev_loop *loop, ev_io * stat_io,
+ static void process_child_status(EventLoop_t *loop, ServiceIoWatcher * stat_io,
int revents) noexcept;
void handle_exit_status() noexcept;
: service_state(ServiceState::STOPPED), desired_state(ServiceState::STOPPED), auto_restart(false),
pinned_stopped(false), pinned_started(false), waiting_for_deps(false),
waiting_for_execstat(false), doing_recovery(false),
- start_explicit(false), force_stop(false)
+ start_explicit(false), force_stop(false), child_listener(this), child_status_listener(this)
{
service_set = set;
service_name = name;