From eee3bdf9001f3dddc408ee8155a4658219dc0faf Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Mon, 13 Jun 2016 22:20:28 +0100 Subject: [PATCH] Update dasynq --- src/dasync/dasync-epoll.h | 11 -- src/dasync/dasync-flags.h | 13 ++ src/dasync/dasync-kqueue.h | 328 +++++++++++++++---------------------- src/dasync/dasync.h | 200 +++++++++++++++------- 4 files changed, 288 insertions(+), 264 deletions(-) create mode 100644 src/dasync/dasync-flags.h diff --git a/src/dasync/dasync-epoll.h b/src/dasync/dasync-epoll.h index 8411fd3..5bd3e5e 100644 --- a/src/dasync/dasync-epoll.h +++ b/src/dasync/dasync-epoll.h @@ -14,17 +14,6 @@ namespace dasync { -// Event type bits -constexpr unsigned int in_events = 1; -constexpr unsigned int out_events = 2; -constexpr unsigned int err_events = 4; - -constexpr unsigned int one_shot = 8; - -// Masks: -constexpr unsigned int IO_EVENTS = in_events | out_events; - - template class EpollLoop; class EpollTraits diff --git a/src/dasync/dasync-flags.h b/src/dasync/dasync-flags.h new file mode 100644 index 0000000..441c34a --- /dev/null +++ b/src/dasync/dasync-flags.h @@ -0,0 +1,13 @@ +namespace dasync { + +// Event type bits +constexpr unsigned int in_events = 1; +constexpr unsigned int out_events = 2; +constexpr unsigned int err_events = 4; + +constexpr unsigned int one_shot = 8; + +// Masks: +constexpr unsigned int IO_EVENTS = in_events | out_events; + +} diff --git a/src/dasync/dasync-kqueue.h b/src/dasync/dasync-kqueue.h index 0ccbc58..5dab0c5 100644 --- a/src/dasync/dasync-kqueue.h +++ b/src/dasync/dasync-kqueue.h @@ -22,14 +22,6 @@ extern "C" { namespace dasync { -// Event type bits -constexpr unsigned int in_events = 1; -constexpr unsigned int out_events = 2; -constexpr unsigned int err_events = 4; - -constexpr unsigned int one_shot = 8; - - template class KqueueLoop; class KqueueTraits @@ -73,14 +65,22 @@ class KqueueTraits return fd; } }; + + const static bool has_separate_rw_fd_watches = true; }; #if defined(__OpenBSD__) -// OpenBSD has no sigtimedwait (or sigwaitinfo) but does have -// "__thrsigdivert" - WTF. -static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, const struct timespec *timeout) +// OpenBSD has no sigtimedwait (or sigwaitinfo) but does have "__thrsigdivert", which is +// essentially an incomplete version of the same thing. Discussion with OpenBSD developer +// Ted Unangst suggested that the siginfo_t structure returned might not always have all +// fields set correctly. Furthermore there is a bug such that specifying a zero timeout (or +// indeed any timeout less than a tick) results in NO timeout. +static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct timespec *timeout) { - // return __syscall(SYS___thrsigdivert, *ssp, info, timeout); + // We know that we're only called with a timeout of 0 (which doesn't work properly) and + // that we safely overwrite the timeout. So, we set tv_nsec to an invalid value, which + // will cause EINVAL to be returned, but will still pick up any pending signals *first*. + timeout->tv_nsec = 1000000001; return __thrsigdivert(*ssp, info, timeout); } #endif @@ -88,7 +88,22 @@ static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, const struc template class KqueueLoop : public Base { int kqfd; // kqueue fd + sigset_t sigmask; // enabled signal watch mask + // Map of signal number to user data pointer. If kqueue had been better thought-through, + // we shouldn't need this. Although we can associate user data with an EVFILT_SIGNAL kqueue + // filter, the problem is that the kqueue signal report *coexists* with the regular signal + // delivery mechanism without having any connection to it. Whereas regular signals can be + // queued (especially "realtime" signals, via sigqueue()), kqueue just maintains a counter + // of delivery attempts and clears this when we read the event. What this means is that + // kqueue won't necessarily tell us if signals are pending, in the case that: + // 1) it already reported the attempted signal delivery and + // 2) more than one of the same signal was pending at that time and + // 3) no more deliveries of the same signal have been attempted in the meantime. + // Of course, if kqueue doesn't report the signal, then it doesn't give us the data associated + // with the event, so we need to maintain that separately too: + std::unordered_map sigdataMap; + // Base contains: // lock - a lock that can be used to protect internal structure. // receive*() methods will be called with lock held. @@ -103,9 +118,8 @@ template class KqueueLoop : public Base std::lock_guard guard(Base::lock); for (int i = 0; i < r; i++) { - if (events[i].filter = EVFILT_SIGNAL) { + if (events[i].filter == EVFILT_SIGNAL) { SigInfo siginfo; - //siginfo_t siginfo; sigset_t sset; sigemptyset(&sset); sigaddset(&sset, events[i].ident); @@ -113,36 +127,31 @@ template class KqueueLoop : public Base timeout.tv_sec = 0; timeout.tv_nsec = 0; if (sigtimedwait(&sset, &siginfo.info, &timeout) > 0) { - Base::receiveSignal(siginfo, (void *)events[i].udata); + Base::receiveSignal(siginfo, (void *)events[i].udata); + } + + if (events[i].ident != SIGCHLD) { + sigdelset(&sigmask, events[i].ident); + events[i].flags = EV_DISABLE; + } + else { + // TODO can we remove this SIGCHLD hack? + events[i].flags = EV_ENABLE; } - // OpenBSD doesn't have sigwaitinfo... - // TODO we can probably do better by establishing a handler - // and letting it run (using eg sigsuspend()) but that requires - // a global area for returning the siginfo_t data and that in - // turn requires a global mutex. :( - //sigset_t sigset; - //sigpending(&sigset); - //SigInfo siginfo; - //if (sigismember(&sigset, events[i].ident)) { - // consume signal - // int rsig; - // sigemptyset(&sigset); - // sigaddset(&sigset, events[i].ident); - // sigwait(&sigset, &rsig); - // siginfo.info.si_signo = events[i].ident; - // Base::receiveSignal(siginfo, (void *)events[i].udata); - //} } - - //else { + else { // int flags = 0; // (events[i].events & EPOLLIN) && (flags |= in_events); // (events[i].events & EPOLLHUP) && (flags |= in_events); // (events[i].events & EPOLLOUT) && (flags |= out_events); // (events[i].events & EPOLLERR) && (flags |= err_events); // Base::receiveFdEvent(*this, FD_r(), ptr, flags); - //} + events[i].flags = EV_DISABLE; + } } + + // Now we disable all received events, to simulate EV_DISPATCH: + kevent(kqfd, events, r, nullptr, 0, nullptr); } public: @@ -158,7 +167,8 @@ template class KqueueLoop : public Base if (kqfd == -1) { throw std::system_error(errno, std::system_category()); } - //sigemptyset(&sigmask); + sigemptyset(&sigmask); + Base::init(this); } ~KqueueLoop() @@ -166,6 +176,13 @@ template class KqueueLoop : public Base close(kqfd); } + void disableFilter(short filterType, uintptr_t ident) + { + struct kevent kev; + EV_SET(&kev, ident, filterType, EV_DISABLE, 0, 0, 0); + kevent(kqfd, &kev, 1, nullptr, 0, nullptr); + } + // flags: in_events | out_events void addFdWatch(int fd, void *userdata, int flags) { @@ -243,52 +260,54 @@ template class KqueueLoop : public Base //} } + void disableFdWatch_nolock(int fd) + { + // TODO + } + // Note signal should be masked before call. void addSignalWatch(int signo, void *userdata) { - //std::lock_guard guard(Base::lock); - - //sigdataMap[signo] = userdata; - - // Modify the signal fd to watch the new signal - //bool was_no_sigfd = (sigfd == -1); - //sigaddset(&sigmask, signo); - //sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); - //if (sigfd == -1) { - // throw new std::system_error(errno, std::system_category()); - //} + std::lock_guard guard(Base::lock); - //if (was_no_sigfd) { - // Add the signalfd to the epoll set. - // struct epoll_event epevent; - // epevent.data.ptr = &sigfd; - // epevent.events = EPOLLIN; - // No need for EPOLLONESHOT - we can pull the signals out - // as we see them. - // if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) { - // close(sigfd); - // throw new std::system_error(errno, std::system_category()); - // } - //} + sigdataMap[signo] = userdata; + sigaddset(&sigmask, signo); + + struct kevent evt; + EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD, 0, 0, userdata); + // TODO use EV_DISPATCH if available (not on OpenBSD) + + if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) { + throw new std::system_error(errno, std::system_category()); + } } // Note, called with lock held: void rearmSignalWatch_nolock(int signo) noexcept { - //sigaddset(&sigmask, signo); - //signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + sigaddset(&sigmask, signo); + + struct kevent evt; + EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ENABLE, 0, 0, 0); + // TODO use EV_DISPATCH if available (not on OpenBSD) + + kevent(kqfd, &evt, 1, nullptr, 0, nullptr); } void removeSignalWatch_nolock(int signo) noexcept { - //sigdelset(&sigmask, signo); - //signalfd(sigfd, &sigmask, 0); + sigdelset(&sigmask, signo); + + struct kevent evt; + EV_SET(&evt, signo, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0); + + kevent(kqfd, &evt, 1, nullptr, 0, nullptr); } void removeSignalWatch(int signo) noexcept { - //std::lock_guard guard(Base::lock); - //removeSignalWatch_nolock(signo); + std::lock_guard guard(Base::lock); + removeSignalWatch_nolock(signo); } // If events are pending, process an unspecified number of them. @@ -303,14 +322,47 @@ template class KqueueLoop : public Base // pending. void pullEvents(bool do_wait) { - //epoll_event events[16]; - //int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0); - //if (r == -1 || r == 0) { + // We actually need to check pending signals, since + // kqueue can count signals as they are delivered but the count is + // cleared when we poll the kqueue, meaning that signals might still + // be pending if they were queued multiple times at the last poll. + + // TODO we should only poll for signals that *have* been reported + // as being raised more than once prior via kevent, rather than all + // signals that have been registered - in many cases that will allow + // us to skip the sigtimedwait call altogether. + + { + std::lock_guard guard(Base::lock); + struct timespec timeout; + timeout.tv_sec = 0; + timeout.tv_nsec = 0; + SigInfo siginfo; + int rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout); + while (rsigno > 0) { + // TODO avoid this hack for SIGCHLD somehow + if (rsigno != SIGCHLD) { + sigdelset(&sigmask, rsigno); + // TODO accumulate and disable multiple filters with a single kevents call + // rather than disabling each individually + disableFilter(EVFILT_SIGNAL, rsigno); + } + Base::receiveSignal(siginfo, sigdataMap[rsigno]); + rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout); + } + } + + struct kevent events[16]; + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 0; + int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts); + if (r == -1 || r == 0) { // signal or no events - // return; - //} - - //processEvents(events, r); + return; + } + + processEvents(events, r); } // If events are pending, process one of them. @@ -321,14 +373,18 @@ template class KqueueLoop : public Base // pending. void pullOneEvent(bool do_wait) { - //epoll_event events[1]; - //int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0); - //if (r == -1 || r == 0) { + // TODO must check for pending signals as per pullEvents() + struct kevent events[1]; + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 0; + int r = kevent(kqfd, nullptr, 0, events, 1, do_wait ? nullptr : &ts); + if (r == -1 || r == 0) { // signal or no events - // return; - //} + return; + } - //processEvents(events, r); + processEvents(events, r); } // Interrupt any current poll operation (pullEvents/pullOneEvent), causing @@ -339,120 +395,4 @@ template class KqueueLoop : public Base } }; -// Map of pid_t to void *, with possibility of reserving entries so that mappings can -// be later added with no danger of allocator exhaustion (bad_alloc). -class pid_map -{ - using pair = std::pair; - std::unordered_map base_map; - std::vector backup_vector; - - // Number of entries in backup_vector that are actually in use (as opposed - // to simply reserved): - int backup_size = 0; - - public: - using entry = std::pair; - - entry get(pid_t key) noexcept - { - auto it = base_map.find(key); - if (it == base_map.end()) { - // Not in map; look in vector - for (int i = 0; i < backup_size; i++) { - if (backup_vector[i].first == key) { - return entry(true, backup_vector[i].second); - } - } - - return entry(false, nullptr); - } - - return entry(true, it->second); - } - - entry erase(pid_t key) noexcept - { - auto iter = base_map.find(key); - if (iter != base_map.end()) { - entry r(true, iter->second); - base_map.erase(iter); - return r; - } - for (int i = 0; i < backup_size; i++) { - if (backup_vector[i].first == key) { - entry r(true, backup_vector[i].second); - backup_vector.erase(backup_vector.begin() + i); - return r; - } - } - return entry(false, nullptr); - } - - // Throws bad_alloc on reservation failure - void reserve() - { - backup_vector.resize(backup_vector.size() + 1); - } - - void add(pid_t key, void *val) // throws std::bad_alloc - { - base_map[key] = val; - } - - void add_from_reserve(pid_t key, void *val) noexcept - { - try { - base_map[key] = val; - backup_vector.resize(backup_vector.size() - 1); - } - catch (std::bad_alloc &) { - // We couldn't add into the map, use the reserve: - backup_vector[backup_size++] = pair(key, val); - } - } -}; - -template class ChildProcEvents : public Base -{ - private: - pid_map child_waiters; - - using SigInfo = typename Base::SigInfo; - - protected: - void receiveSignal(SigInfo &siginfo, void *userdata) - { - if (siginfo.get_signo() == SIGCHLD) { - int status; - pid_t child; - while ((child = waitpid(-1, &status, WNOHANG)) > 0) { - pid_map::entry ent = child_waiters.erase(child); - if (ent.first) { - Base::receiveChildStat(child, status, ent.second); - } - } - } - else { - Base::receiveSignal(siginfo, userdata); - } - } - - public: - void reserveChildWatch() - { - child_waiters.reserve(); - } - - void addChildWatch(pid_t child, void *val) - { - child_waiters.add(child, val); - } - - void addReservedChildWatch(pid_t child, void *val) noexcept - { - child_waiters.add_from_reserve(child, val); - } -}; - } // end namespace diff --git a/src/dasync/dasync.h b/src/dasync/dasync.h index ce41252..807f88e 100644 --- a/src/dasync/dasync.h +++ b/src/dasync/dasync.h @@ -9,6 +9,8 @@ #define HAVE_EPOLL 1 #endif +#include "dasync-flags.h" + #if defined(HAVE_KQUEUE) #include "dasync-kqueue.h" #include "dasync-childproc.h" @@ -63,11 +65,12 @@ enum class Rearm REARM, /** Disarm the event watcher so that it receives no further events, until it is re-armed explicitly */ DISARM, + /** Leave in current armed/disarmed state */ + NOOP, /** Remove the event watcher (and call "removed" callback) */ REMOVE, - /** Leave in current state */ - NOOP -// TODO: add a REMOVED option, which means, "I removed myself, DON'T TOUCH ME" + /** The watcher has been removed - don't touch it! */ + REMOVED // TODO: add a REQUEUE option, which means, "I didn't complete input/output, run me again soon" }; @@ -335,6 +338,11 @@ namespace dprivate { } } + bool isQueued(BaseWatcher *bwatcher) + { + return bwatcher->prev != nullptr; + } + void dequeueWatcher(BaseWatcher *bwatcher) { if (bwatcher->prev == bwatcher) { @@ -351,11 +359,6 @@ namespace dprivate { bwatcher->next = nullptr; } - bool isQueued(BaseWatcher *bwatcher) - { - return bwatcher->prev != nullptr; - } - protected: T_Mutex lock; @@ -428,10 +431,13 @@ namespace dprivate { lock.lock(); + // TODO this needs to handle multi-watch (BidiFdWatcher) properly + if (watcher->active) { // If the watcher is active, set deleteme true; the watcher will be removed // at the end of current processing (i.e. when active is set false). watcher->deleteme = true; + lock.unlock(); } else { // Actually do the delete. @@ -439,11 +445,45 @@ namespace dprivate { dequeueWatcher(watcher); } - // TODO call this without lock? + lock.unlock(); watcher->watchRemoved(); } + } + + void issueDelete(BaseBidiFdWatcher *watcher) noexcept + { + lock.lock(); + + if (watcher->active) { + watcher->deleteme = true; + } + else { + if (isQueued(watcher)) { + dequeueWatcher(watcher); + } + + watcher->read_removed = true; + } - lock.unlock(); + BaseWatcher *secondary = &(watcher->outWatcher); + if (secondary->active) { + secondary->deleteme = true; + } + else { + if (isQueued(secondary)) { + dequeueWatcher(secondary); + } + + watcher->write_removed = true; + } + + if (watcher->read_removed && watcher->write_removed) { + lock.unlock(); + watcher->watchRemoved(); + } + else { + lock.unlock(); + } } }; } @@ -594,15 +634,15 @@ template class EventLoop } else { loop_mech.removeFdWatch(fd); - - waitqueue_node qnode; - getAttnLock(qnode); - - EventDispatch & ed = (EventDispatch &) loop_mech; - ed.issueDelete(callback); - - releaseLock(qnode); } + + waitqueue_node qnode; + getAttnLock(qnode); + + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.issueDelete(callback); + + releaseLock(qnode); } void reserveChildWatch(BaseChildWatcher *callBack) @@ -619,6 +659,13 @@ template class EventLoop { loop_mech.addReservedChildWatch(child, callBack); } + + void dequeueWatcher(BaseWatcher *watcher) noexcept + { + if (loop_mech.isQueued(watcher)) { + loop_mech.dequeueWatcher(watcher); + } + } // Acquire the attention lock (when held, ensures that no thread is polling the AEN // mechanism). @@ -795,11 +842,32 @@ template class EventLoop pqueue->active = true; active = true; - ed.lock.unlock(); - Rearm rearmType = Rearm::NOOP; bool is_multi_watch = false; BaseBidiFdWatcher *bbfw = nullptr; + + // Read/manipulate watch_flags (if necessary) *before* we release the lock: + if (pqueue->watchType == WatchType::FD) { + BaseFdWatcher *bfw = static_cast(pqueue); + bbfw = static_cast(bfw); + is_multi_watch = bfw->watch_flags & dprivate::multi_watch; + if (! LoopTraits::has_separate_rw_fd_watches && is_multi_watch) { + // Clear the input watch flags to avoid enabling read watcher while active: + bfw->watch_flags &= ~in_events; + } + } + else if (pqueue->watchType == WatchType::SECONDARYFD) { + is_multi_watch = true; + char * rp = (char *)pqueue; + rp -= offsetof(BaseBidiFdWatcher, outWatcher); + bbfw = (BaseBidiFdWatcher *)rp; + if (! LoopTraits::has_separate_rw_fd_watches) { + bbfw->watch_flags &= ~out_events; + } + } + + ed.lock.unlock(); + // (Above variables are initialised only to silence compiler warnings). // Note that we select actions based on the type of the watch, as determined by the watchType @@ -816,11 +884,9 @@ template class EventLoop } case WatchType::FD: { BaseFdWatcher *bfw = static_cast(pqueue); - is_multi_watch = bfw->watch_flags & dprivate::multi_watch; if (is_multi_watch) { // The primary watcher for a multi-watch watcher is queued for // read events. - bbfw = static_cast(bfw); rearmType = bbfw->readReady(this, bfw->watch_fd); } else { @@ -832,57 +898,54 @@ template class EventLoop BaseChildWatcher *bcw = static_cast(pqueue); bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status); // Child watches automatically remove: + // TODO what if they want to return REMOVED... rearmType = Rearm::REMOVE; break; } case WatchType::SECONDARYFD: { // first construct a pointer to the main watcher: - is_multi_watch = true; - char * rp = (char *)pqueue; - rp -= offsetof(BaseBidiFdWatcher, outWatcher); - bbfw = (BaseBidiFdWatcher *)rp; rearmType = bbfw->writeReady(this, bbfw->watch_fd); break; } default: ; } - + ed.lock.lock(); - pqueue->active = false; - if (pqueue->deleteme) { - // We don't want a watch that is marked "deleteme" to re-arm itself. - // NOOP flags that the state is managed externally, so we don't adjust that. - if (rearmType != Rearm::NOOP) { + // (if REMOVED, we must not touch pqueue at all) + if (rearmType != Rearm::REMOVED) { + + pqueue->active = false; + if (pqueue->deleteme) { + // We don't want a watch that is marked "deleteme" to re-arm itself. rearmType = Rearm::REMOVE; } + switch (pqueue->watchType) { + case WatchType::SIGNAL: + processSignalRearm(static_cast(pqueue), rearmType); + break; + case WatchType::FD: + rearmType = processFdRearm(static_cast(pqueue), rearmType, is_multi_watch); + break; + case WatchType::SECONDARYFD: + rearmType = processSecondaryRearm(bbfw, rearmType); + break; + default: ; + } + + if (pqueue->deleteme) rearmType = Rearm::REMOVE; // makes the watchRemoved() callback get called. + + if (rearmType == Rearm::REMOVE) { + ed.lock.unlock(); + (is_multi_watch ? bbfw : pqueue)->watchRemoved(); + ed.lock.lock(); + } } - switch (pqueue->watchType) { - case WatchType::SIGNAL: - processSignalRearm(static_cast(pqueue), rearmType); - break; - case WatchType::FD: - rearmType = processFdRearm(static_cast(pqueue), rearmType, is_multi_watch); - break; - case WatchType::SECONDARYFD: - rearmType = processSecondaryRearm(bbfw, rearmType); - break; - default: ; - } - - if (pqueue->deleteme) rearmType = Rearm::REMOVE; // makes the watchRemoved() callback get called. - - ed.lock.unlock(); - - if (rearmType == Rearm::REMOVE) { - (is_multi_watch ? bbfw : pqueue)->watchRemoved(); - } - - ed.lock.lock(); pqueue = ed.pullEvent(); } + ed.lock.unlock(); return active; } @@ -995,12 +1058,19 @@ class PosixFdWatcher : private dprivate::BaseFdWatcher void setEnabled(EventLoop *eloop, bool enable) noexcept { - eloop->setFdEnabled(this, this->watch_fd, this->watch_flags, enable); + std::lock_guard guard(eloop->getBaseLock()); + eloop->setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable); + if (! enable) { + eloop->dequeueWatcher(this); + } } // virtual Rearm gotEvent(EventLoop *, int fd, int flags) = 0; }; +// A Bi-directional file descriptor watcher with independent read- and write- channels. +// This watcher type has two event notification methods which can both potentially be +// active at the same time. template class PosixBidiFdWatcher : private dprivate::BaseBidiFdWatcher { @@ -1019,16 +1089,25 @@ class PosixBidiFdWatcher : private dprivate::BaseBidiFdWatcher if (LoopTraits::has_separate_rw_fd_watches) { dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher; eloop->setFdEnabled_nolock(watcher, this->watch_fd, events | one_shot, b); + if (! b) { + eloop->dequeueWatcher(watcher); + } } else { eloop->setFdEnabled_nolock(this, this->watch_fd, (this->watch_flags & (in_events | out_events)) | one_shot, (this->watch_flags & (in_events | out_events)) != 0); + if (! b) { + dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher; + eloop->dequeueWatcher(watcher); + } } } protected: + // TODO if a watch is disabled and currently queued, we should de-queue it. + void setInWatchEnabled(EventLoop *eloop, bool b) noexcept { eloop->getBaseLock().lock(); @@ -1044,13 +1123,16 @@ class PosixBidiFdWatcher : private dprivate::BaseBidiFdWatcher } // Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly. - // This can only be called from the event handler callbacks (readReady/writeReady) and only if - // set{In|Out}WatchEnabled will not be called concurrently. + // + // Concurrency: this method can only be called if + // - it does not enable a watcher that might currently be active + /// - unless the event loop will not be polled while the watcher is active. + // (i.e. it is ok to call setWatchFlags from within the readReady/writeReady handlers if no other + // thread will poll the event loop; it is ok to *dis*able a watcher that might be active). void setWatchFlags(EventLoop * eloop, int newFlags) { + std::lock_guard guard(eloop->getBaseLock()); if (LoopTraits::has_separate_rw_fd_watches) { - // (In this case, this function is fully thread-safe) - std::lock_guard guard(eloop->getBaseLock()); setWatchEnabled(eloop, true, (newFlags & in_events) != 0); setWatchEnabled(eloop, false, (newFlags & out_events) != 0); } -- 2.25.1