From: Davin McCall Date: Sun, 12 Jun 2016 23:57:20 +0000 (+0100) Subject: Update dasynq X-Git-Tag: v0.03~27 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=414c61bc36295fcce4c46c92c2897d9bf01d14ad;p=oweals%2Fdinit.git Update dasynq --- diff --git a/src/dasync/dasync-epoll.h b/src/dasync/dasync-epoll.h index 26225a2..8411fd3 100644 --- a/src/dasync/dasync-epoll.h +++ b/src/dasync/dasync-epoll.h @@ -21,6 +21,9 @@ constexpr unsigned int err_events = 4; constexpr unsigned int one_shot = 8; +// Masks: +constexpr unsigned int IO_EVENTS = in_events | out_events; + template class EpollLoop; @@ -68,6 +71,9 @@ class EpollTraits return ss.fd; } }; + + const static bool has_bidi_fd_watch = true; + const static bool has_separate_rw_fd_watches = false; }; @@ -172,19 +178,19 @@ template class EpollLoop : public Base } } - void removeFdWatch(int fd) + void removeFdWatch(int fd) noexcept { epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr); } - void removeFdWatch_nolock(int fd) + void removeFdWatch_nolock(int fd) noexcept { removeFdWatch(fd); } // Note this will *replace* the old flags with the new, that is, // it can enable *or disable* read/write events. - void enableFdWatch(int fd, void *userdata, int flags) + void enableFdWatch(int fd, void *userdata, int flags) noexcept { struct epoll_event epevent; // epevent.data.fd = fd; @@ -200,9 +206,10 @@ template class EpollLoop : public Base if (flags & out_events) { epevent.events |= EPOLLOUT; } - + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { - throw new std::system_error(errno, std::system_category()); + // Shouldn't be able to fail + // throw new std::system_error(errno, std::system_category()); } } @@ -211,7 +218,7 @@ template class EpollLoop : public Base enableFdWatch(fd, userdata, flags); } - void disableFdWatch(int fd) + void disableFdWatch(int fd) noexcept { struct epoll_event epevent; // epevent.data.fd = fd; @@ -222,10 +229,16 @@ template class EpollLoop : public Base // whether this is really the case. Suspect it is really only the case if // EPOLLIN is set. if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { - throw new std::system_error(errno, std::system_category()); + // Let's assume that this can't fail. + // throw new std::system_error(errno, std::system_category()); } } + void disableFdWatch_nolock(int fd) noexcept + { + disableFdWatch(fd); + } + // Note signal should be masked before call. void addSignalWatch(int signo, void *userdata) { diff --git a/src/dasync/dasync.h b/src/dasync/dasync.h index 62a70d8..ce41252 100644 --- a/src/dasync/dasync.h +++ b/src/dasync/dasync.h @@ -75,6 +75,7 @@ enum class Rearm // Forward declarations: template class EventLoop; template class PosixFdWatcher; +template class PosixBidiFdWatcher; template class PosixSignalWatcher; template class PosixChildWatcher; @@ -112,10 +113,21 @@ namespace dprivate { int active : 1; int deleteme : 1; + BaseWatcher * prev; BaseWatcher * next; public: - BaseWatcher(WatchType wt) noexcept : watchType(wt), active(0), deleteme(0), next(nullptr) { } + + // Perform initialisation necessary before registration with an event loop + void init() + { + active = false; + deleteme = false; + prev = nullptr; + next = nullptr; + } + + BaseWatcher(WatchType wt) noexcept : watchType(wt) { } virtual ~BaseWatcher() noexcept { } @@ -170,23 +182,24 @@ namespace dprivate { template friend class EventDispatch; friend class dasync::EventLoop; - // The main instance is the "input" watcher only; we keep a secondary watcher - // with a secondary set of flags for the "output" watcher: - BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD); - - // This should never actually get called: - Rearm gotEvent(EventLoop * eloop, int fd, int flags) + // This should never actually get called: + Rearm gotEvent(EventLoop * eloop, int fd, int flags) final { return Rearm::REARM; // should not be reachable. }; protected: + + // The main instance is the "input" watcher only; we keep a secondary watcher + // with a secondary set of flags for the "output" watcher: + BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD); + int read_removed : 1; // read watch removed? int write_removed : 1; // write watch removed? public: - virtual Rearm readReady(EventLoop * eloop, int fd) = 0; - virtual Rearm writeReady(EventLoop * eloop, int fd) = 0; + virtual Rearm readReady(EventLoop * eloop, int fd) noexcept = 0; + virtual Rearm writeReady(EventLoop * eloop, int fd) noexcept = 0; }; template @@ -308,17 +321,39 @@ namespace dprivate { void queueWatcher(BaseWatcher *bwatcher) { - // TODO - // We can't allow a queued entry to be deleted (due to the single-linked-list used for the queue) - // so for now, I'll set it active; but this prevents it being deleted until we can next - // process events, so once we have a proper linked list or better structure should probably - // remove this: - bwatcher->active = true; - // Put in queue: - BaseWatcher * prev_first = first; - first = bwatcher; - bwatcher->next = prev_first; + if (first == nullptr) { + bwatcher->prev = bwatcher; + bwatcher->next = bwatcher; + first = bwatcher; + } + else { + first->prev->next = bwatcher; + bwatcher->prev = first->prev; + first->prev = bwatcher; + bwatcher->next = first; + } + } + + void dequeueWatcher(BaseWatcher *bwatcher) + { + if (bwatcher->prev == bwatcher) { + // Only item in queue + first = nullptr; + } + else { + if (first == bwatcher) first = first->next; + bwatcher->prev->next = bwatcher->next; + bwatcher->next->prev = bwatcher->prev; + } + + bwatcher->prev = nullptr; + bwatcher->next = nullptr; + } + + bool isQueued(BaseWatcher *bwatcher) + { + return bwatcher->prev != nullptr; } protected: @@ -355,10 +390,16 @@ namespace dprivate { queueWatcher(bwatcher); - if (is_multi_watch && bfdw->event_flags != bfdw->watch_flags) { - // We need to re-enable the other channel now: - loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata, - (bfdw->watch_flags & ~(bfdw->event_flags)) | one_shot); + if (! LoopTraits::has_separate_rw_fd_watches) { + // If this is a bidirectional fd-watch, it has been disabled in *both* directions + // as the event was delivered. However, the other direction should not be disabled + // yet, so we need to re-enable: + int in_out_mask = in_events | out_events; + if (is_multi_watch && bfdw->event_flags != (bfdw->watch_flags & in_out_mask)) { + // We need to re-enable the other channel now: + loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata, + (bfdw->watch_flags & ~(bfdw->event_flags)) | one_shot); + } } } @@ -369,21 +410,21 @@ namespace dprivate { queueWatcher(watcher); } - // TODO is this needed?: + // Pull a single event from the queue BaseWatcher * pullEvent() { - if (first) { - BaseWatcher * r = first; - first = first->next; - return r; + BaseWatcher * r = first; + if (r != nullptr) { + dequeueWatcher(r); } - return nullptr; + return r; } void issueDelete(BaseWatcher *watcher) noexcept { // This is only called when the attention lock is held, so if the watcher is not - // active/queued now, it cannot become active during execution of this function. + // active/queued now, it cannot become active (and will not be reported with an event) + // during execution of this function. lock.lock(); @@ -394,6 +435,11 @@ namespace dprivate { } else { // Actually do the delete. + if (isQueued(watcher)) { + dequeueWatcher(watcher); + } + + // TODO call this without lock? watcher->watchRemoved(); } @@ -406,6 +452,7 @@ namespace dprivate { template class EventLoop { friend class PosixFdWatcher; + friend class PosixBidiFdWatcher; friend class PosixSignalWatcher; friend class PosixChildWatcher; @@ -469,6 +516,10 @@ template class EventLoop waitqueue attn_waitqueue; waitqueue wait_waitqueue; + T_Mutex &getBaseLock() + { + return loop_mech.lock; + } void registerSignal(BaseSignalWatcher *callBack, int signo) { @@ -490,18 +541,38 @@ template class EventLoop void registerFd(BaseFdWatcher *callback, int fd, int eventmask) { - loop_mech.addFdWatch(fd, callback, eventmask); + loop_mech.addFdWatch(fd, callback, eventmask | one_shot); } - void setEnabled(BaseFdWatcher *callback, int fd, int watch_flags, bool enabled) + void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask) + { + if (LoopTraits::has_separate_rw_fd_watches) { + // TODO + } + else { + loop_mech.addFdWatch(fd, callback, eventmask | one_shot); + } + } + + void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) { if (enabled) { - loop_mech.enableFdWatch(fd, static_cast(callback), watch_flags | one_shot); + loop_mech.enableFdWatch(fd, watcher, watch_flags | one_shot); } else { loop_mech.disableFdWatch(fd); } } + + void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) + { + if (enabled) { + loop_mech.enableFdWatch_nolock(fd, watcher, watch_flags | one_shot); + } + else { + loop_mech.disableFdWatch_nolock(fd); + } + } void deregister(BaseFdWatcher *callback, int fd) { @@ -516,6 +587,24 @@ template class EventLoop releaseLock(qnode); } + void deregister(BaseBidiFdWatcher *callback, int fd) + { + if (LoopTraits::has_separate_rw_fd_watches) { + // TODO + } + else { + loop_mech.removeFdWatch(fd); + + waitqueue_node qnode; + getAttnLock(qnode); + + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.issueDelete(callback); + + releaseLock(qnode); + } + } + void reserveChildWatch(BaseChildWatcher *callBack) { loop_mech.reserveChildWatch(); @@ -600,11 +689,20 @@ template class EventLoop if (rearmType == Rearm::REMOVE) { bdfw->read_removed = 1; bdfw->watch_flags &= ~in_events; - if (! bdfw->write_removed) { - return Rearm::NOOP; + + if (! LoopTraits::has_separate_rw_fd_watches) { + if (! bdfw->write_removed) { + return Rearm::NOOP; + } + else { + // both removed: actually remove + loop_mech.removeFdWatch_nolock(bdfw->watch_fd); + return Rearm::REMOVE; + } } else { - // both removed: actually remove + // TODO this will need flags for such a loop, since it can't + // otherwise distinguish which channel watch to remove loop_mech.removeFdWatch_nolock(bdfw->watch_fd); } } @@ -613,9 +711,16 @@ template class EventLoop } else if (rearmType == Rearm::REARM) { bdfw->watch_flags |= in_events; - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), - (bdfw->watch_flags & (in_events | out_events)) | one_shot); + if (! LoopTraits::has_separate_rw_fd_watches) { + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + (bdfw->watch_flags & (in_events | out_events)) | one_shot); + } + else { + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + in_events | one_shot); + } } return rearmType; } @@ -637,12 +742,22 @@ template class EventLoop if (rearmType == Rearm::REMOVE) { bdfw->write_removed = 1; bdfw->watch_flags &= ~out_events; - if (! bdfw->read_removed) { - return Rearm::NOOP; + + if (LoopTraits::has_separate_rw_fd_watches) { + // TODO this will need flags for such a loop, since it can't + // otherwise distinguish which channel watch to remove + loop_mech.removeFdWatch_nolock(bdfw->watch_fd); + return bdfw->read_removed ? Rearm::REMOVE : Rearm::NOOP; } else { - // both removed: actually remove - loop_mech.removeFdWatch_nolock(bdfw->watch_fd); + if (! bdfw->read_removed) { + return Rearm::NOOP; + } + else { + // both removed: actually remove + loop_mech.removeFdWatch_nolock(bdfw->watch_fd); + return Rearm::REMOVE; + } } } else if (rearmType == Rearm::DISARM) { @@ -650,9 +765,16 @@ template class EventLoop } else if (rearmType == Rearm::REARM) { bdfw->watch_flags |= out_events; - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), - (bdfw->watch_flags & (in_events | out_events)) | one_shot); + if (! LoopTraits::has_separate_rw_fd_watches) { + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + (bdfw->watch_flags & (in_events | out_events)) | one_shot); + } + else { + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + out_events | one_shot); + } } return rearmType; } @@ -665,32 +787,16 @@ template class EventLoop // So this pulls *all* currently pending events and processes them in the current thread. // That's probably good for throughput, but maybe the behavior should be configurable. - BaseWatcher * pqueue = ed.first; - ed.first = nullptr; + BaseWatcher * pqueue = ed.pullEvent(); bool active = false; - BaseWatcher * prev = nullptr; - for (BaseWatcher * q = pqueue; q != nullptr; q = q->next) { - if (q->deleteme) { - // TODO should this really be called with lock held? - q->watchRemoved(); - if (prev) { - prev->next = q->next; - } - else { - pqueue = q->next; - } - } - else { - q->active = true; - active = true; - prev = q; - } - } + while (pqueue != nullptr) { - ed.lock.unlock(); + pqueue->active = true; + active = true; + + ed.lock.unlock(); - while (pqueue != nullptr) { Rearm rearmType = Rearm::NOOP; bool is_multi_watch = false; BaseBidiFdWatcher *bbfw = nullptr; @@ -714,8 +820,8 @@ template class EventLoop if (is_multi_watch) { // The primary watcher for a multi-watch watcher is queued for // read events. - BaseBidiFdWatcher *bdfw = static_cast(bfw); - rearmType = bdfw->readReady(this, bfw->watch_fd); + bbfw = static_cast(bfw); + rearmType = bbfw->readReady(this, bfw->watch_fd); } else { rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags); @@ -731,6 +837,7 @@ template class EventLoop } case WatchType::SECONDARYFD: { // first construct a pointer to the main watcher: + is_multi_watch = true; char * rp = (char *)pqueue; rp -= offsetof(BaseBidiFdWatcher, outWatcher); bbfw = (BaseBidiFdWatcher *)rp; @@ -758,7 +865,7 @@ template class EventLoop rearmType = processFdRearm(static_cast(pqueue), rearmType, is_multi_watch); break; case WatchType::SECONDARYFD: - processSecondaryRearm(bbfw, rearmType); + rearmType = processSecondaryRearm(bbfw, rearmType); break; default: ; } @@ -768,10 +875,12 @@ template class EventLoop ed.lock.unlock(); if (rearmType == Rearm::REMOVE) { - pqueue->watchRemoved(); + (is_multi_watch ? bbfw : pqueue)->watchRemoved(); } - pqueue = pqueue->next; + ed.lock.lock(); + + pqueue = ed.pullEvent(); } return active; @@ -808,6 +917,8 @@ TEventLoop & getSystemLoop(); template class PosixSignalWatcher : private dprivate::BaseSignalWatcher { + using BaseWatcher = dprivate::BaseWatcher; + public: using SigInfo_p = typename dprivate::BaseSignalWatcher::SigInfo_p; @@ -816,7 +927,7 @@ public: // a time, behaviour is undefined. inline void registerWatch(EventLoop *eloop, int signo) { - this->deleteme = false; + BaseWatcher::init(); this->siginfo.set_signo(signo); eloop->registerSignal(this, signo); } @@ -833,10 +944,14 @@ public: template class PosixFdWatcher : private dprivate::BaseFdWatcher { + using BaseWatcher = dprivate::BaseWatcher; + protected: - // Set the types of event to watch. May not be supported for all mechanisms. - // Only safe to call from within the callback handler (gotEvent). + // Set the types of event to watch. Only supported if LoopTraits::has_bidi_fd_watch + // is true; otherwise has unspecified behavior. + // Only safe to call from within the callback handler (gotEvent). Might not take + // effect until the current callback handler returns with REARM. void setWatchFlags(int newFlags) { this->watch_flags = newFlags; @@ -858,7 +973,7 @@ class PosixFdWatcher : private dprivate::BaseFdWatcher // Can fail with std::bad_alloc or std::system_error. void registerWith(EventLoop *eloop, int fd, int flags) { - this->deleteme = false; + BaseWatcher::init(); this->watch_fd = fd; this->watch_flags = flags; eloop->registerFd(this, fd, flags); @@ -880,16 +995,110 @@ class PosixFdWatcher : private dprivate::BaseFdWatcher void setEnabled(EventLoop *eloop, bool enable) noexcept { - eloop->setEnabled(this, this->watch_fd, this->watch_flags, enable); + eloop->setFdEnabled(this, this->watch_fd, this->watch_flags, enable); } // virtual Rearm gotEvent(EventLoop *, int fd, int flags) = 0; }; +template +class PosixBidiFdWatcher : private dprivate::BaseBidiFdWatcher +{ + using BaseWatcher = dprivate::BaseWatcher; + + void setWatchEnabled(EventLoop *eloop, bool in, bool b) + { + int events = in ? in_events : out_events; + + if (b) { + this->watch_flags |= events; + } + else { + this->watch_flags &= ~events; + } + if (LoopTraits::has_separate_rw_fd_watches) { + dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher; + eloop->setFdEnabled_nolock(watcher, this->watch_fd, events | one_shot, b); + } + else { + eloop->setFdEnabled_nolock(this, this->watch_fd, + (this->watch_flags & (in_events | out_events)) | one_shot, + (this->watch_flags & (in_events | out_events)) != 0); + } + } + + protected: + + void setInWatchEnabled(EventLoop *eloop, bool b) noexcept + { + eloop->getBaseLock().lock(); + setWatchEnabled(eloop, true, b); + eloop->getBaseLock().unlock(); + } + + void setOutWatchEnabled(EventLoop *eloop, bool b) noexcept + { + eloop->getBaseLock().lock(); + setWatchEnabled(eloop, false, b); + eloop->getBaseLock().unlock(); + } + + // Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly. + // This can only be called from the event handler callbacks (readReady/writeReady) and only if + // set{In|Out}WatchEnabled will not be called concurrently. + void setWatchFlags(EventLoop * eloop, int newFlags) + { + if (LoopTraits::has_separate_rw_fd_watches) { + // (In this case, this function is fully thread-safe) + std::lock_guard guard(eloop->getBaseLock()); + setWatchEnabled(eloop, true, (newFlags & in_events) != 0); + setWatchEnabled(eloop, false, (newFlags & out_events) != 0); + } + else { + this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags; + eloop->setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true); + } + } + + public: + + // Register a bi-direction file descriptor watcher with an event loop. Flags + // can be any combination of dasync::in_events / dasync::out_events. + // + // Can fail with std::bad_alloc or std::system_error. + void registerWith(EventLoop *eloop, int fd, int flags) + { + BaseWatcher::init(); + this->outWatcher.BaseWatcher::init(); + this->watch_fd = fd; + this->watch_flags = flags | dprivate::multi_watch; + eloop->registerFd(this, fd, flags); + } + + // Deregister a bi-direction file descriptor watcher. + // + // If other threads may be polling the event loop, it is not safe to assume + // the watcher is unregistered until the watchRemoved() callback is issued + // (which will not occur until the event handler returns, if it is active). + // In a single threaded environment, it is safe to delete the watcher after + // calling this method as long as the handler (if it is active) accesses no + // internal state and returns Rearm::REMOVED. + // TODO: implement REMOVED, or correct above statement. + void deregisterWatch(EventLoop *eloop) noexcept + { + eloop->deregister(this, this->watch_fd); + } + + // Rearm readReady(EventLoop * eloop, int fd) noexcept + // Rearm writeReady(EventLoop * eloop, int fd) noexcept +}; + // Posix child process event watcher template class PosixChildWatcher : private dprivate::BaseChildWatcher { + using BaseWatcher = dprivate::BaseWatcher; + public: // Reserve resources for a child watcher with the given event loop. // Reservation can fail with std::bad_alloc. @@ -902,7 +1111,7 @@ class PosixChildWatcher : private dprivate::BaseChildWatcher // Registration can fail with std::bad_alloc. void registerWith(EventLoop *eloop, pid_t child) { - this->deleteme = false; + BaseWatcher::init(); this->watch_pid = child; eloop->registerChild(this, child); } @@ -912,6 +1121,7 @@ class PosixChildWatcher : private dprivate::BaseChildWatcher // Registration cannot fail. void registerReserved(EventLoop *eloop, pid_t child) noexcept { + BaseWatcher::init(); eloop->registerReservedChild(this, child); }