From 8eff855aad88e40a1360c3055f01dfdda75ce3ba Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Sat, 18 Jun 2016 15:11:19 +0100 Subject: [PATCH] Update Dasynq, again, and update Dinit code to reflect API changes. Hopefully the Dasynq API is now fairly stable. --- src/control.cc | 28 ++-- src/control.h | 24 ++-- src/dasynq/dasynq-childproc.h | 25 ++++ src/dasynq/dasynq-epoll.h | 13 ++ src/dasynq/dasynq-kqueue.h | 23 ++++ src/dasynq/dasynq.h | 249 +++++++++++++++++++++++++--------- src/dinit-log.cc | 18 +-- src/dinit.cc | 24 ++-- src/service.cc | 45 +++--- src/service.h | 8 +- 10 files changed, 325 insertions(+), 132 deletions(-) diff --git a/src/control.cc b/src/control.cc index 20478cf..5cc8798 100644 --- a/src/control.cc +++ b/src/control.cc @@ -51,7 +51,7 @@ bool ControlConn::processPacket() char outbuf[] = { DINIT_RP_BADREQ }; if (! queuePacket(outbuf, 1)) return false; bad_conn_close = true; - iob.setWatchFlags(OUT_EVENTS); + iob.setWatches(OUT_EVENTS); } return true; } @@ -75,7 +75,7 @@ bool ControlConn::processFindLoad(int pktType) char badreqRep[] = { DINIT_RP_BADREQ }; if (! queuePacket(badreqRep, 1)) return false; bad_conn_close = true; - iob.setWatchFlags(OUT_EVENTS); + iob.setWatches(OUT_EVENTS); return true; } @@ -151,7 +151,7 @@ bool ControlConn::processStartStop(int pktType) char badreqRep[] = { DINIT_RP_BADREQ }; if (! queuePacket(badreqRep, 1)) return false; bad_conn_close = true; - iob.setWatchFlags(OUT_EVENTS); + iob.setWatches(OUT_EVENTS); return true; } else { @@ -225,7 +225,7 @@ bool ControlConn::processUnpinService() char badreqRep[] = { DINIT_RP_BADREQ }; if (! queuePacket(badreqRep, 1)) return false; bad_conn_close = true; - iob.setWatchFlags(OUT_EVENTS); + iob.setWatches(OUT_EVENTS); return true; } else { @@ -288,7 +288,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept else { if ((unsigned)wr == size) { // Ok, all written. - iob.setWatchFlags(in_flag); + iob.setWatches(in_flag); return true; } pkt += wr; @@ -299,7 +299,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept // Create a vector out of the (remaining part of the) packet: try { outbuf.emplace_back(pkt, pkt + size); - iob.setWatchFlags(in_flag | OUT_EVENTS); + iob.setWatches(in_flag | OUT_EVENTS); return true; } catch (std::bad_alloc &baexc) { @@ -313,7 +313,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept return false; } else { - iob.setWatchFlags(OUT_EVENTS); + iob.setWatches(OUT_EVENTS); return true; } } @@ -343,7 +343,7 @@ bool ControlConn::queuePacket(std::vector &&pkt) noexcept else { if ((unsigned)wr == pkt.size()) { // Ok, all written. - iob.setWatchFlags(in_flag); + iob.setWatches(in_flag); return true; } outpkt_index = wr; @@ -352,7 +352,7 @@ bool ControlConn::queuePacket(std::vector &&pkt) noexcept try { outbuf.emplace_back(pkt); - iob.setWatchFlags(in_flag | OUT_EVENTS); + iob.setWatches(in_flag | OUT_EVENTS); return true; } catch (std::bad_alloc &baexc) { @@ -366,7 +366,7 @@ bool ControlConn::queuePacket(std::vector &&pkt) noexcept return false; } else { - iob.setWatchFlags(OUT_EVENTS); + iob.setWatches(OUT_EVENTS); return true; } } @@ -412,11 +412,11 @@ bool ControlConn::dataReady() noexcept // TODO log error? // TODO error response? bad_conn_close = true; - iob.setWatchFlags(OUT_EVENTS); + iob.setWatches(OUT_EVENTS); } else { int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0; - iob.setWatchFlags(IN_EVENTS | out_flags); + iob.setWatches(IN_EVENTS | out_flags); } return false; @@ -458,7 +458,7 @@ bool ControlConn::sendData() noexcept outpkt_index = 0; if (outbuf.empty() && ! oom_close) { if (! bad_conn_close) { - iob.setWatchFlags(IN_EVENTS); + iob.setWatches(IN_EVENTS); } else { return true; @@ -472,7 +472,7 @@ bool ControlConn::sendData() noexcept ControlConn::~ControlConn() noexcept { close(iob.fd); - iob.deregisterWatch(loop); + iob.deregister(*loop); // Clear service listeners for (auto p : serviceKeyMap) { diff --git a/src/control.h b/src/control.h index a184039..05b4845 100644 --- a/src/control.h +++ b/src/control.h @@ -49,14 +49,14 @@ class ServiceRecord; class ControlConnWatcher : public EventLoop_t::BidiFdWatcher { - inline Rearm receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept; + inline Rearm receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept; - Rearm readReady(EventLoop_t * loop, int fd) noexcept override + Rearm readReady(EventLoop_t &loop, int fd) noexcept override { return receiveEvent(loop, fd, IN_EVENTS); } - Rearm writeReady(EventLoop_t * loop, int fd) noexcept override + Rearm writeReady(EventLoop_t &loop, int fd) noexcept override { return receiveEvent(loop, fd, OUT_EVENTS); } @@ -65,22 +65,22 @@ class ControlConnWatcher : public EventLoop_t::BidiFdWatcher int fd; // TODO this is already stored, find a better way to access it. EventLoop_t * eventLoop; - void setWatchFlags(int flags) + void setWatches(int flags) { - EventLoop_t::BidiFdWatcher::setWatchFlags(eventLoop, flags); + EventLoop_t::BidiFdWatcher::setWatches(*eventLoop, flags); } - void registerWith(EventLoop_t *loop, int fd, int flags) + void registerWith(EventLoop_t &loop, int fd, int flags) { this->fd = fd; - this->eventLoop = loop; - BidiFdWatcher::registerWith(loop, fd, flags); + this->eventLoop = &loop; + BidiFdWatcher::addWatch(loop, fd, flags); } }; -inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept +inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept { - return control_conn_cb(loop, this, flags); + return control_conn_cb(&loop, this, flags); } @@ -169,7 +169,7 @@ class ControlConn : private ServiceListener { bad_conn_close = true; oom_close = true; - iob.setWatchFlags(OUT_EVENTS); + iob.setWatches(OUT_EVENTS); } // Process service event broadcast. @@ -206,7 +206,7 @@ class ControlConn : private ServiceListener public: ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0) { - iob.registerWith(loop, fd, IN_EVENTS); + iob.registerWith(*loop, fd, IN_EVENTS); active_control_conns++; } diff --git a/src/dasynq/dasynq-childproc.h b/src/dasynq/dasynq-childproc.h index e364613..af33e26 100644 --- a/src/dasynq/dasynq-childproc.h +++ b/src/dasynq/dasynq-childproc.h @@ -56,6 +56,11 @@ class pid_map backup_vector.resize(backup_vector.size() + 1); } + void unreserve() noexcept + { + backup_vector.resize(backup_vector.size() - 1); + } + void add(pid_t key, void *val) // throws std::bad_alloc { base_map[key] = val; @@ -113,18 +118,38 @@ template class ChildProcEvents : public Base public: void reserveChildWatch() { + std::lock_guard guard(Base::lock); child_waiters.reserve(); } + void unreserveChildWatch() noexcept + { + std::lock_guard guard(Base::lock); + child_waiters.unreserve(); + } + void addChildWatch(pid_t child, void *val) { + std::lock_guard guard(Base::lock); child_waiters.add(child, val); } void addReservedChildWatch(pid_t child, void *val) noexcept { + std::lock_guard guard(Base::lock); child_waiters.add_from_reserve(child, val); } + + void addReservedChildWatch_nolock(pid_t child, void *val) noexcept + { + child_waiters.add_from_reserve(child, val); + } + + void removeChildWatch(pid_t child) noexcept + { + std::lock_guard guard(Base::lock); + child_waiters.erase(child); + } template void init(T *loop_mech) { diff --git a/src/dasynq/dasynq-epoll.h b/src/dasynq/dasynq-epoll.h index 1ffbc56..7a93e21 100644 --- a/src/dasynq/dasynq-epoll.h +++ b/src/dasynq/dasynq-epoll.h @@ -63,6 +63,7 @@ class EpollTraits const static bool has_bidi_fd_watch = true; const static bool has_separate_rw_fd_watches = false; + const static bool supports_childwatch_reservation = true; }; @@ -167,6 +168,12 @@ template class EpollLoop : public Base } } + void addBidiFdWatch(int fd, void *userdata, int flags) + { + // No implementation. + throw std::system_error(std::make_error_code(std::errc::not_supported)); + } + void removeFdWatch(int fd, int flags) noexcept { epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr); @@ -177,6 +184,12 @@ template class EpollLoop : public Base removeFdWatch(fd, flags); } + void removeBidiFdWatch(int fd) noexcept + { + // Shouldn't be called for epoll. + removeFdWatch(fd, IN_EVENTS | OUT_EVENTS); + } + // Note this will *replace* the old flags with the new, that is, // it can enable *or disable* read/write events. void enableFdWatch(int fd, void *userdata, int flags) noexcept diff --git a/src/dasynq/dasynq-kqueue.h b/src/dasynq/dasynq-kqueue.h index 64a3828..f6d4ce2 100644 --- a/src/dasynq/dasynq-kqueue.h +++ b/src/dasynq/dasynq-kqueue.h @@ -71,6 +71,7 @@ class KqueueTraits const static bool has_bidi_fd_watch = false; const static bool has_separate_rw_fd_watches = true; + const static bool supports_childwatch_reservation = true; }; #if defined(__OpenBSD__) @@ -199,6 +200,8 @@ template class KqueueLoop : public Base void addFdWatch(int fd, void *userdata, int flags) { // TODO kqueue doesn't support EVFILE_WRITE on file fd's :/ + // Presumably they cause the kevent call to fail. We could maintain + // a separate set and use poll() (urgh). short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE; @@ -209,6 +212,17 @@ template class KqueueLoop : public Base } } + void addBidiFdWatch(int fd, void *userdata, int flags) + { + struct kevent kev[2]; + EV_SET(&kev[0], fd, EVFILT_READ, EV_ADD, 0, 0, userdata); + EV_SET(&kev[1], fd, EVFILE_WRITE, EV_ADD, 0, 0, userdata); + + if (kevent(kqfd, kev, 2, nullptr, 0, nullptr) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } + void removeFdWatch(int fd, int flags) { removeFilter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd); @@ -218,6 +232,15 @@ template class KqueueLoop : public Base { removeFdWatch(fd, flags); } + + void removeBidiFdWatch(int fd) noexcept + { + struct kevent kev[2]; + EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata); + EV_SET(&kev[1], fd, EVFILE_WRITE, EV_DELETE, 0, 0, userdata); + + kevent(kqfd, kev, 2, nullptr, 0, nullptr); + } void enableFdWatch(int fd, void *userdata, int flags) { diff --git a/src/dasynq/dasynq.h b/src/dasynq/dasynq.h index d9b4b4a..426fd8b 100644 --- a/src/dasynq/dasynq.h +++ b/src/dasynq/dasynq.h @@ -30,6 +30,10 @@ namespace dasynq { #include #include #include +#include + +#include +#include #include "dasynq-mutex.h" @@ -159,7 +163,7 @@ namespace dprivate { public: typedef SigInfo &SigInfo_p; - virtual Rearm gotSignal(EventLoop * eloop, int signo, SigInfo_p siginfo) = 0; + virtual Rearm received(EventLoop &eloop, int signo, SigInfo_p siginfo) = 0; }; template @@ -176,7 +180,7 @@ namespace dprivate { BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { } public: - virtual Rearm gotEvent(EventLoop * eloop, int fd, int flags) = 0; + virtual Rearm fdEvent(EventLoop &eloop, int fd, int flags) = 0; }; template @@ -186,7 +190,7 @@ namespace dprivate { friend class dasynq::EventLoop; // This should never actually get called: - Rearm gotEvent(EventLoop * eloop, int fd, int flags) final + Rearm fdEvent(EventLoop &eloop, int fd, int flags) final { return Rearm::REARM; // should not be reachable. }; @@ -201,8 +205,8 @@ namespace dprivate { int write_removed : 1; // write watch removed? public: - virtual Rearm readReady(EventLoop * eloop, int fd) noexcept = 0; - virtual Rearm writeReady(EventLoop * eloop, int fd) noexcept = 0; + virtual Rearm readReady(EventLoop &eloop, int fd) noexcept = 0; + virtual Rearm writeReady(EventLoop &eloop, int fd) noexcept = 0; }; template @@ -218,7 +222,7 @@ namespace dprivate { BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { } public: - virtual void gotTermStat(EventLoop * eloop, pid_t child, int status) = 0; + virtual Rearm childStatus(EventLoop &eloop, pid_t child, int status) = 0; }; // Classes for implementing a fair(ish) wait queue. @@ -431,8 +435,6 @@ namespace dprivate { lock.lock(); - // TODO this needs to handle multi-watch (BidiFdWatcher) properly - if (watcher->active) { // If the watcher is active, set deleteme true; the watcher will be removed // at the end of current processing (i.e. when active is set false). @@ -496,6 +498,10 @@ template class EventLoop friend class dprivate::SignalWatcher>; friend class dprivate::ChildProcWatcher>; + public: + using LoopTraits = dasynq::LoopTraits; + + private: template using EventDispatch = dprivate::EventDispatch; template using waitqueue = dprivate::waitqueue; template using waitqueue_node = dprivate::waitqueue_node; @@ -587,7 +593,7 @@ template class EventLoop void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask) { if (LoopTraits::has_separate_rw_fd_watches) { - // TODO + loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT); } else { loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT); @@ -630,7 +636,7 @@ template class EventLoop void deregister(BaseBidiFdWatcher *callback, int fd) { if (LoopTraits::has_separate_rw_fd_watches) { - // TODO + loop_mech.removeBidiFdWatch(fd); } else { loop_mech.removeFdWatch(fd, callback->watch_flags); @@ -650,6 +656,11 @@ template class EventLoop loop_mech.reserveChildWatch(); } + void unreserve(BaseChildWatcher *callBack) + { + loop_mech.unreserveChildWatch(); + } + void registerChild(BaseChildWatcher *callBack, pid_t child) { loop_mech.addChildWatch(child, callBack); @@ -659,6 +670,24 @@ template class EventLoop { loop_mech.addReservedChildWatch(child, callBack); } + + void registerReservedChild_nolock(BaseChildWatcher *callBack, pid_t child) noexcept + { + loop_mech.addReservedChildWatch_nolock(child, callBack); + } + + void deregister(BaseChildWatcher *callback, pid_t child) + { + loop_mech.removeChildWatch(child); + + waitqueue_node qnode; + getAttnLock(qnode); + + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.issueDelete(callback); + + releaseLock(qnode); + } void dequeueWatcher(BaseWatcher *watcher) noexcept { @@ -860,6 +889,7 @@ template class EventLoop } else if (pqueue->watchType == WatchType::SECONDARYFD) { is_multi_watch = true; + // construct a pointer to the main watcher: char * rp = (char *)pqueue; rp -= offsetof(BaseBidiFdWatcher, outWatcher); bbfw = (BaseBidiFdWatcher *)rp; @@ -879,7 +909,7 @@ template class EventLoop switch (pqueue->watchType) { case WatchType::SIGNAL: { BaseSignalWatcher *bsw = static_cast(pqueue); - rearmType = bsw->gotSignal(this, bsw->siginfo.get_signo(), bsw->siginfo); + rearmType = bsw->received(*this, bsw->siginfo.get_signo(), bsw->siginfo); break; } case WatchType::FD: { @@ -887,24 +917,20 @@ template class EventLoop if (is_multi_watch) { // The primary watcher for a multi-watch watcher is queued for // read events. - rearmType = bbfw->readReady(this, bfw->watch_fd); + rearmType = bbfw->readReady(*this, bfw->watch_fd); } else { - rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags); + rearmType = bfw->fdEvent(*this, bfw->watch_fd, bfw->event_flags); } break; } case WatchType::CHILD: { BaseChildWatcher *bcw = static_cast(pqueue); - bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status); - // Child watches automatically remove: - // TODO what if they want to return REMOVED... - rearmType = Rearm::REMOVE; + rearmType = bcw->childStatus(*this, bcw->watch_pid, bcw->child_status); break; } case WatchType::SECONDARYFD: { - // first construct a pointer to the main watcher: - rearmType = bbfw->writeReady(this, bbfw->watch_fd); + rearmType = bbfw->writeReady(*this, bbfw->watch_fd); break; } default: ; @@ -956,6 +982,8 @@ template class EventLoop using SignalWatcher = dprivate::SignalWatcher>; using ChildProcWatcher = dprivate::ChildProcWatcher>; + // using LoopTraits = dasynq::LoopTraits; + void run() noexcept { while (! processEvents()) { @@ -994,20 +1022,21 @@ public: // Register this watcher to watch the specified signal. // If an attempt is made to register with more than one event loop at - // a time, behaviour is undefined. - inline void registerWatch(EventLoop *eloop, int signo) + // a time, behaviour is undefined. The signal should be masked before + // call. + inline void addWatch(EventLoop &eloop, int signo) { BaseWatcher::init(); this->siginfo.set_signo(signo); - eloop->registerSignal(this, signo); + eloop.registerSignal(this, signo); } - inline void deregisterWatch(EventLoop *eloop) noexcept + inline void deregister(EventLoop &eloop) noexcept { - eloop->deregister(this, this->siginfo.get_signo()); + eloop.deregister(this, this->siginfo.get_signo()); } - // virtual Rearm gotSignal(EventLoop *, int signo, SigInfo_p info) = 0; + // virtual Rearm received(EventLoop &, int signo, SigInfo_p info) = 0; }; // Posix file descriptor event watcher @@ -1021,7 +1050,7 @@ class FdWatcher : private dprivate::BaseFdWatcher // Set the types of event to watch. Only supported if LoopTraits::has_bidi_fd_watch // is true; otherwise has unspecified behavior. - // Only safe to call from within the callback handler (gotEvent). Might not take + // Only safe to call from within the callback handler (fdEvent). Might not take // effect until the current callback handler returns with REARM. void setWatchFlags(int newFlags) { @@ -1043,12 +1072,12 @@ class FdWatcher : private dprivate::BaseFdWatcher // causes undefined behavior. // // Can fail with std::bad_alloc or std::system_error. - void registerWith(EventLoop *eloop, int fd, int flags) + void addWatch(EventLoop &eloop, int fd, int flags) { BaseWatcher::init(); this->watch_fd = fd; this->watch_flags = flags; - eloop->registerFd(this, fd, flags); + eloop.registerFd(this, fd, flags); } // Deregister a file descriptor watcher. @@ -1060,21 +1089,21 @@ class FdWatcher : private dprivate::BaseFdWatcher // calling this method as long as the handler (if it is active) accesses no // internal state and returns Rearm::REMOVED. // TODO: implement REMOVED, or correct above statement. - void deregisterWatch(EventLoop *eloop) noexcept + void deregister(EventLoop &eloop) noexcept { - eloop->deregister(this, this->watch_fd); + eloop.deregister(this, this->watch_fd); } - void setEnabled(EventLoop *eloop, bool enable) noexcept + void setEnabled(EventLoop &eloop, bool enable) noexcept { - std::lock_guard guard(eloop->getBaseLock()); - eloop->setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable); + std::lock_guard guard(eloop.getBaseLock()); + eloop.setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable); if (! enable) { - eloop->dequeueWatcher(this); + eloop.dequeueWatcher(this); } } - // virtual Rearm gotEvent(EventLoop *, int fd, int flags) = 0; + // virtual Rearm fdEvent(EventLoop *, int fd, int flags) = 0; }; // A Bi-directional file descriptor watcher with independent read- and write- channels. @@ -1086,7 +1115,7 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcheroutWatcher; - eloop->setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b); + eloop.setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b); if (! b) { - eloop->dequeueWatcher(watcher); + eloop.dequeueWatcher(watcher); } } else { - eloop->setFdEnabled_nolock(this, this->watch_fd, + eloop.setFdEnabled_nolock(this, this->watch_fd, (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT, (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) != 0); if (! b) { dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher; - eloop->dequeueWatcher(watcher); + eloop.dequeueWatcher(watcher); } } } @@ -1118,18 +1147,18 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatchergetBaseLock().lock(); + eloop.getBaseLock().lock(); setWatchEnabled(eloop, true, b); - eloop->getBaseLock().unlock(); + eloop.getBaseLock().unlock(); } - void setOutWatchEnabled(EventLoop *eloop, bool b) noexcept + void setOutWatchEnabled(EventLoop &eloop, bool b) noexcept { - eloop->getBaseLock().lock(); + eloop.getBaseLock().lock(); setWatchEnabled(eloop, false, b); - eloop->getBaseLock().unlock(); + eloop.getBaseLock().unlock(); } // Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly. @@ -1139,16 +1168,16 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher guard(eloop->getBaseLock()); + std::lock_guard guard(eloop.getBaseLock()); if (LoopTraits::has_separate_rw_fd_watches) { setWatchEnabled(eloop, true, (newFlags & IN_EVENTS) != 0); setWatchEnabled(eloop, false, (newFlags & OUT_EVENTS) != 0); } else { this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags; - eloop->setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true); + eloop.setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true); } } @@ -1158,13 +1187,13 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcheroutWatcher.BaseWatcher::init(); this->watch_fd = fd; this->watch_flags = flags | dprivate::multi_watch; - eloop->registerFd(this, fd, flags); + eloop.registerFd(this, fd, flags); } // Deregister a bi-direction file descriptor watcher. @@ -1176,9 +1205,9 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcherderegister(this, this->watch_fd); + eloop.deregister(this, this->watch_fd); } // Rearm readReady(EventLoop * eloop, int fd) noexcept @@ -1194,31 +1223,129 @@ class ChildProcWatcher : private dprivate::BaseChildWatcherreserveChildWatch(); + eloop.reserveChildWatch(this); + } + + void unreserve(EventLoop &eloop) + { + eloop.unreserve(this); } // Register a watcher for the given child process with an event loop. // Registration can fail with std::bad_alloc. - void registerWith(EventLoop *eloop, pid_t child) + // Note that in multi-threaded programs, use of this function may be prone to a + // race condition such that the child terminates before the watcher is registered. + void addWatch(EventLoop &eloop, pid_t child) { BaseWatcher::init(); this->watch_pid = child; - eloop->registerChild(this, child); + eloop.registerChild(this, child); } // Register a watcher for the given child process with an event loop, // after having reserved resources previously (using reserveWith). // Registration cannot fail. - void registerReserved(EventLoop *eloop, pid_t child) noexcept + // Note that in multi-threaded programs, use of this function may be prone to a + // race condition such that the child terminates before the watcher is registered. + void addReserved(EventLoop &eloop, pid_t child) noexcept { BaseWatcher::init(); - eloop->registerReservedChild(this, child); + eloop.registerReservedChild(this, child); + } + + void deregister(EventLoop &eloop, pid_t child) noexcept + { + eloop.deregister(this, child); + } + + // Fork and watch the child with this watcher on the given event loop. + // If resource limitations prevent the child process from being watched, it is + // terminated immediately (or if the implementation allows, never started), + // and a suitable std::system_error or std::bad_alloc exception is thrown. + // Returns: + // - the child pid in the parent + // - 0 in the child + pid_t fork(EventLoop &eloop) + { + if (EventLoop::LoopTraits::supports_childwatch_reservation) { + // Reserve a watch, fork, then claim reservation + reserveWatch(eloop); + + auto &lock = eloop.getBaseLock(); + lock.lock(); + + pid_t child = ::fork(); + if (child == -1) { + // Unreserve watch. + lock.unlock(); + unreserve(eloop); + throw std::system_error(errno, std::system_category()); + } + + if (child == 0) { + // I am the child + lock.unlock(); // may not really be necessary + return 0; + } + + // Register this watcher. + eloop.registerReservedChild_nolock(this, child); + lock.unlock(); + return child; + } + else { + int pipefds[2]; + if (pipe2(pipefds, O_CLOEXEC) == -1) { + throw std::system_error(errno, std::system_category()); + } + + std::lock_guard guard(eloop.getBaseLock()); + + pid_t child = ::fork(); + if (child == -1) { + throw std::system_error(errno, std::system_category()); + } + + if (child == 0) { + // I am the child + + // Wait for message from parent before continuing: + int rr; + int r = read(pipefds[0], &rr, sizeof(rr)); + while (r == -1 && errno == EINTR) { + read(pipefds[0], &rr, sizeof(rr)); + } + + if (r == -1) _exit(0); + + close(pipefds[0]); + return 0; + } + + close(pipefds[0]); // close read end + + // Register this watcher. + try { + eloop.registerChild(this, child); + + // Continue in child (it doesn't matter what is written): + write(pipefds[1], &pipefds, sizeof(int)); + close(pipefds[1]); + + return child; + } + catch (...) { + close(pipefds[1]); + throw; + } + } } - // virtual void gotTermStat(EventLoop *, pid_t child, int status) = 0; + // virtual Rearm childStatus(EventLoop &, pid_t child, int status) = 0; }; } // namespace dasynq::dprivate diff --git a/src/dinit-log.cc b/src/dinit-log.cc index 43062f0..292e780 100644 --- a/src/dinit-log.cc +++ b/src/dinit-log.cc @@ -50,7 +50,7 @@ class BufferedLogStream : public EventLoop_t::FdWatcher release = false; } - Rearm gotEvent(EventLoop_t *loop, int fd, int flags) noexcept override; + Rearm fdEvent(EventLoop_t &loop, int fd, int flags) noexcept override; // Check whether the console can be released. void flushForRelease(); @@ -63,7 +63,7 @@ class BufferedLogStream : public EventLoop_t::FdWatcher bool was_first = current_index == 0; current_index = log_buffer.get_length(); if (was_first && ! release) { - setEnabled(&eventLoop, true); + setEnabled(eventLoop, true); } } @@ -106,15 +106,15 @@ void BufferedLogStream::flushForRelease() // Try to flush any messages that are currently buffered. (Console is non-blocking // so it will fail gracefully). - if (gotEvent(&eventLoop, fd, OUT_EVENTS) == Rearm::DISARM) { + if (fdEvent(eventLoop, fd, OUT_EVENTS) == Rearm::DISARM) { // Console has already been released at this point. - setEnabled(&eventLoop, false); + setEnabled(eventLoop, false); } - // gotEvent didn't want to disarm, so must be partway through a message; will + // fdEvent didn't want to disarm, so must be partway through a message; will // release when it's finished. } -Rearm BufferedLogStream::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept +Rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept { auto &log_stream = *this; @@ -223,7 +223,7 @@ Rearm BufferedLogStream::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept void init_log(ServiceSet *sset) { service_set = sset; - log_stream[DLOG_CONS].registerWith(&eventLoop, STDOUT_FILENO, OUT_EVENTS); // TODO register in disabled state + log_stream[DLOG_CONS].addWatch(eventLoop, STDOUT_FILENO, OUT_EVENTS); // TODO register in disabled state enable_console_log(true); } @@ -232,7 +232,7 @@ void init_log(ServiceSet *sset) void setup_main_log(int fd) { log_stream[DLOG_MAIN].init(fd); - log_stream[DLOG_MAIN].registerWith(&eventLoop, fd, OUT_EVENTS); + log_stream[DLOG_MAIN].addWatch(eventLoop, fd, OUT_EVENTS); } bool is_log_flushed() noexcept @@ -253,7 +253,7 @@ void enable_console_log(bool enable) noexcept fcntl(1, F_SETFL, flags | O_NONBLOCK); // Activate watcher: log_stream[DLOG_CONS].init(STDOUT_FILENO); - log_stream[DLOG_CONS].setEnabled(&eventLoop, true); + log_stream[DLOG_CONS].setEnabled(eventLoop, true); } else if (! enable && log_to_console) { log_stream[DLOG_CONS].flushForRelease(); diff --git a/src/dinit.cc b/src/dinit.cc index 7027cca..5fd99d5 100644 --- a/src/dinit.cc +++ b/src/dinit.cc @@ -75,9 +75,9 @@ static void control_socket_cb(EventLoop_t *loop, int fd); class ControlSocketWatcher : public EventLoop_t::FdWatcher { - Rearm gotEvent(EventLoop_t * loop, int fd, int flags) + Rearm fdEvent(EventLoop_t &loop, int fd, int flags) override { - control_socket_cb(loop, fd); + control_socket_cb(&loop, fd); return Rearm::REARM; } @@ -85,10 +85,10 @@ class ControlSocketWatcher : public EventLoop_t::FdWatcher // TODO the fd is already stored, must we really store it again... int fd; - void registerWith(EventLoop_t * loop, int fd, int flags) + void addWatch(EventLoop_t &loop, int fd, int flags) { this->fd = fd; - EventLoop_t::FdWatcher::registerWith(loop, fd, flags); + EventLoop_t::FdWatcher::addWatch(loop, fd, flags); } }; @@ -150,7 +150,7 @@ namespace { this->cb_func = cb_func; } - Rearm gotSignal(EventLoop_t * eloop, int signo, SigInfo_p siginfo) override + Rearm received(EventLoop_t &eloop, int signo, SigInfo_p siginfo) override { service_set->stop_all_services(ShutdownType::REBOOT); return Rearm::REARM; @@ -159,9 +159,9 @@ namespace { class ControlSocketWatcher : public EventLoop_t::FdWatcher { - Rearm gotEvent(EventLoop_t * loop, int fd, int flags) + Rearm fdEvent(EventLoop_t &loop, int fd, int flags) { - control_socket_cb(loop, fd); + control_socket_cb(&loop, fd); return Rearm::REARM; } }; @@ -319,9 +319,9 @@ int main(int argc, char **argv) auto sigterm_watcher = CallbackSignalHandler(sigterm_cb); - sigint_watcher.registerWatch(&eventLoop, SIGINT); - sigquit_watcher.registerWatch(&eventLoop, SIGQUIT); - sigterm_watcher.registerWatch(&eventLoop, SIGTERM); + sigint_watcher.addWatch(eventLoop, SIGINT); + sigquit_watcher.addWatch(eventLoop, SIGQUIT); + sigterm_watcher.addWatch(eventLoop, SIGTERM); // Try to open control socket (may fail due to readonly filesystem) open_control_socket(&eventLoop); @@ -505,7 +505,7 @@ static void open_control_socket(EventLoop_t *loop) noexcept } control_socket_open = true; - control_socket_io.registerWith(&eventLoop, sockfd, IN_EVENTS); + control_socket_io.addWatch(eventLoop, sockfd, IN_EVENTS); } } @@ -513,7 +513,7 @@ static void close_control_socket(EventLoop_t *loop) noexcept { if (control_socket_open) { int fd = control_socket_io.fd; - control_socket_io.deregisterWatch(&eventLoop); + control_socket_io.deregister(*loop); close(fd); // Unlink the socket: diff --git a/src/service.cc b/src/service.cc index 92328a2..770f875 100644 --- a/src/service.cc +++ b/src/service.cc @@ -101,7 +101,7 @@ void ServiceRecord::stopped() noexcept } } -void ServiceChildWatcher::gotTermStat(EventLoop_t * loop, pid_t child, int status) noexcept +dasynq::Rearm ServiceChildWatcher::childStatus(EventLoop_t &loop, pid_t child, int status) noexcept { ServiceRecord *sr = service; @@ -116,10 +116,14 @@ void ServiceChildWatcher::gotTermStat(EventLoop_t * loop, pid_t child, int statu if (sr->waiting_for_execstat) { // We still don't have an exec() status from the forked child, wait for that // before doing any further processing. - return; + return Rearm::REMOVE; } - sr->handle_exit_status(); + // Must deregister now since handle_exit_status might result in re-launch: + deregister(loop, child); + + sr->handle_exit_status(); + return Rearm::REMOVED; } bool ServiceRecord::do_auto_restart() noexcept @@ -219,9 +223,9 @@ void ServiceRecord::handle_exit_status() noexcept } } -Rearm ServiceIoWatcher::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept +Rearm ServiceIoWatcher::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept { - ServiceRecord::process_child_status(loop, this, flags); + ServiceRecord::process_child_status(&loop, this, flags); return Rearm::REMOVED; } @@ -234,7 +238,7 @@ void ServiceRecord::process_child_status(EventLoop_t *loop, ServiceIoWatcher * s int exec_status; int r = read(stat_io->fd, &exec_status, sizeof(int)); close(stat_io->fd); - stat_io->deregisterWatch(loop); + stat_io->deregister(*loop); if (r > 0) { // We read an errno code; exec() failed, and the service startup failed. @@ -528,7 +532,7 @@ bool ServiceRecord::read_pid_file() noexcept pidbuf[r] = 0; // store nul terminator pid = std::atoi(pidbuf); if (kill(pid, 0) == 0) { - child_listener.registerWith(&eventLoop, pid); + child_listener.addWatch(eventLoop, pid); } else { log(LogLevel::ERROR, service_name, ": pid read from pidfile (", pid, ") is not valid"); @@ -640,10 +644,20 @@ bool ServiceRecord::start_ps_process(const std::vector &cmd, bool // TODO make sure pipefd's are not 0/1/2 (STDIN/OUT/ERR) - if they are, dup them // until they are not. - - pid_t forkpid = fork(); - if (forkpid == -1) { - // TODO log error + + pid_t forkpid; + + bool child_status_registered = false; + try { + child_status_listener.addWatch(eventLoop, pipefd[0], IN_EVENTS); + child_status_registered = true; + + forkpid = child_listener.fork(eventLoop); + } + catch (...) { + if (child_status_registered) { + child_status_listener.deregister(eventLoop); + } close(pipefd[0]); close(pipefd[1]); return false; @@ -721,17 +735,8 @@ bool ServiceRecord::start_ps_process(const std::vector &cmd, bool else { // Parent process close(pipefd[1]); // close the 'other end' fd - pid = forkpid; - // Listen for status - // TODO should set this up earlier so we can handle failure case (exception) - child_status_listener.registerWith(&eventLoop, pipefd[0], IN_EVENTS); - - // Add a process listener so we can detect when the - // service stops - // TODO should reserve listener, handle exceptions - child_listener.registerWith(&eventLoop, pid); waiting_for_execstat = true; return true; } diff --git a/src/service.h b/src/service.h index 27c6a37..a45471d 100644 --- a/src/service.h +++ b/src/service.h @@ -185,7 +185,7 @@ class ServiceChildWatcher : public EventLoop_t::ChildProcWatcher public: // TODO resolve clunkiness of storing this field ServiceRecord * service; - void gotTermStat(EventLoop_t * eloop, pid_t child, int status) noexcept; + Rearm childStatus(EventLoop_t &eloop, pid_t child, int status) noexcept; ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { } }; @@ -196,14 +196,14 @@ class ServiceIoWatcher : public EventLoop_t::FdWatcher // TODO resolve clunkiness of storing these fields int fd; ServiceRecord * service; - Rearm gotEvent(EventLoop_t * eloop, int fd, int flags) noexcept; + Rearm fdEvent(EventLoop_t &eloop, int fd, int flags) noexcept; ServiceIoWatcher(ServiceRecord * sr) noexcept : service(sr) { } - void registerWith(EventLoop_t *loop, int fd, int flags) + void addWatch(EventLoop_t &loop, int fd, int flags) { this->fd = fd; - EventLoop_t::FdWatcher::registerWith(loop, fd, flags); + EventLoop_t::FdWatcher::addWatch(loop, fd, flags); } }; -- 2.25.1