From e921e45368c93ce83ae2c8f6d804723496840e7f Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Fri, 9 Feb 2018 22:04:07 +0000 Subject: [PATCH] Upgrade bundled Dasynq to 1.1.2. --- src/dasynq/dasynq-basewatchers.h | 20 +- src/dasynq/dasynq-childproc.h | 4 + src/dasynq/dasynq-config.h | 5 + src/dasynq/dasynq-itimer.h | 1 + src/dasynq/dasynq-kqueue-macos.h | 441 +++++++++++++++++++++++++++++++ src/dasynq/dasynq-select.h | 146 +--------- src/dasynq/dasynq-signal.h | 215 +++++++++++++++ src/dasynq/dasynq.h | 348 +++++++++++++++--------- 8 files changed, 912 insertions(+), 268 deletions(-) create mode 100644 src/dasynq/dasynq-kqueue-macos.h create mode 100644 src/dasynq/dasynq-signal.h diff --git a/src/dasynq/dasynq-basewatchers.h b/src/dasynq/dasynq-basewatchers.h index 67144fb..fc5f6f5 100644 --- a/src/dasynq/dasynq-basewatchers.h +++ b/src/dasynq/dasynq-basewatchers.h @@ -75,7 +75,7 @@ namespace dprivate { TIMER }; - template class event_dispatch; + template class event_dispatch; // For FD watchers: // Use this watch flag to indicate that in and out events should be reported separately, @@ -139,10 +139,10 @@ namespace dprivate { }; // Base signal event - not part of public API - template + template class base_signal_watcher : public base_watcher { - template friend class event_dispatch; + template friend class event_dispatch; template friend class dasynq::event_loop; protected: @@ -154,10 +154,9 @@ namespace dprivate { typedef siginfo_t &siginfo_p; }; - template class base_fd_watcher : public base_watcher { - template friend class event_dispatch; + template friend class event_dispatch; template friend class dasynq::event_loop; protected: @@ -175,10 +174,9 @@ namespace dprivate { base_fd_watcher() noexcept : base_watcher(watch_type_t::FD) { } }; - template - class base_bidi_fd_watcher : public base_fd_watcher + class base_bidi_fd_watcher : public base_fd_watcher { - template friend class event_dispatch; + template friend class event_dispatch; template friend class dasynq::event_loop; base_bidi_fd_watcher(const base_bidi_fd_watcher &) = delete; @@ -195,10 +193,9 @@ namespace dprivate { int write_removed : 1; // write watch removed? }; - template class base_child_watcher : public base_watcher { - template friend class event_dispatch; + template friend class event_dispatch; template friend class dasynq::event_loop; protected: @@ -210,10 +207,9 @@ namespace dprivate { }; - template class base_timer_watcher : public base_watcher { - template friend class event_dispatch; + template friend class event_dispatch; template friend class dasynq::event_loop; protected: diff --git a/src/dasynq/dasynq-childproc.h b/src/dasynq/dasynq-childproc.h index c2da99c..903ea39 100644 --- a/src/dasynq/dasynq-childproc.h +++ b/src/dasynq/dasynq-childproc.h @@ -1,4 +1,8 @@ +#include +#include + #include + #include "dasynq-btree_set.h" namespace dasynq { diff --git a/src/dasynq/dasynq-config.h b/src/dasynq/dasynq-config.h index cea00c3..b38b387 100644 --- a/src/dasynq/dasynq-config.h +++ b/src/dasynq/dasynq-config.h @@ -44,6 +44,11 @@ #if ! defined(DASYNQ_HAVE_KQUEUE) #if defined(__OpenBSD__) || defined(__APPLE__) || defined(__FreeBSD__) #define DASYNQ_HAVE_KQUEUE 1 +#if defined(__APPLE__) +// kqueue on macos has "issues". See extra/macos-kqueue-bug. There is an alternate Dasyqn kqueue backend +// which avoids the issue, which is enabled via DASYNQ_KQUEUE_MACOS_WORKAROUND. +#define DASYNQ_KQUEUE_MACOS_WORKAROUND 1 +#endif #endif #endif diff --git a/src/dasynq/dasynq-itimer.h b/src/dasynq/dasynq-itimer.h index c4d42a1..9ecf828 100644 --- a/src/dasynq/dasynq-itimer.h +++ b/src/dasynq/dasynq-itimer.h @@ -1,6 +1,7 @@ #include #include +#include #include #include diff --git a/src/dasynq/dasynq-kqueue-macos.h b/src/dasynq/dasynq-kqueue-macos.h new file mode 100644 index 0000000..844a5f5 --- /dev/null +++ b/src/dasynq/dasynq-kqueue-macos.h @@ -0,0 +1,441 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +#include "dasynq-config.h" +#include "dasynq-signal.h" + +// "kqueue"-based event loop mechanism. +// +// kqueue is available on BSDs and Mac OS X, though there are subtle differences from OS to OS. +// +// kqueue supports watching file descriptors (input and output as separate watches only), +// signals, child processes, and timers. Unfortunately support for the latter two is imperfect; +// it is not possible to reserve process watches in advance; timers can only be active, count +// down immediately when created, and cannot be reset to another time. For timers especially +// the problems are significant: we can't allocate timers in advance, and we can't even feasibly +// manage our own timer queue via a single kqueue-backed timer. Therefore, an alternate timer +// mechanism must be used together with kqueue. + +namespace dasynq { + +template class kqueue_loop; + +class macos_kqueue_traits : public signal_traits +{ + template friend class macos_kqueue_loop; + + public: + + class fd_r; + + // File descriptor optional storage. If the mechanism can return the file descriptor, this + // class will be empty, otherwise it can hold a file descriptor. + class fd_s { + public: + fd_s(int) { } + + DASYNQ_EMPTY_BODY + }; + + // File descriptor reference (passed to event callback). If the mechanism can return the + // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor + // must be stored in an fd_s instance. + class fd_r { + int fd; + public: + int getFd(fd_s ss) + { + return fd; + } + fd_r(int nfd) : fd(nfd) + { + } + }; + + constexpr static bool has_bidi_fd_watch = false; + constexpr static bool has_separate_rw_fd_watches = true; + constexpr static bool interrupt_after_fd_add = false; + constexpr static bool supports_non_oneshot_fd = false; +}; + +#if _POSIX_REALTIME_SIGNALS > 0 +static inline void prepare_signal(int signo) { } +static inline void unprep_signal(int signo) { } + +inline bool get_siginfo(int signo, siginfo_t *siginfo) +{ + struct timespec timeout; + timeout.tv_sec = 0; + timeout.tv_nsec = 0; + + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, signo); + return (sigtimedwait(&mask, siginfo, &timeout) != -1); +} +#else + +// If we have no sigtimedwait implementation, we have to retrieve signal data by establishing a +// signal handler. + +// We need to declare and define a non-static data variable, "siginfo_p", in this header, without +// violating the "one definition rule". The only way to do that is via a template, even though we +// don't otherwise need a template here: +template class sig_capture_templ +{ + public: + static siginfo_t * siginfo_p; + + static void signalHandler(int signo, siginfo_t *siginfo, void *v) + { + *siginfo_p = *siginfo; + } +}; +template siginfo_t * sig_capture_templ::siginfo_p = nullptr; + +using sig_capture = sig_capture_templ<>; + +inline void prepare_signal(int signo) +{ + struct sigaction the_action; + the_action.sa_sigaction = sig_capture::signalHandler; + the_action.sa_flags = SA_SIGINFO; + sigfillset(&the_action.sa_mask); + + sigaction(signo, &the_action, nullptr); +} + +inline void unprep_signal(int signo) +{ + signal(signo, SIG_DFL); +} + +inline bool get_siginfo(int signo, siginfo_t *siginfo) +{ + sig_capture::siginfo_p = siginfo; + + sigset_t mask; + sigfillset(&mask); + sigdelset(&mask, signo); + sigsuspend(&mask); + return true; +} + +#endif + +template class macos_kqueue_loop : public signal_events +{ + int kqfd; // kqueue fd + + // Base contains: + // lock - a lock that can be used to protect internal structure. + // receive*() methods will be called with lock held. + // receive_signal(sigdata_t &, user *) noexcept + // receive_fd_event(fd_r, user *, int flags) noexcept + + using fd_r = typename macos_kqueue_traits::fd_r; + + // The flag to specify poll() semantics for regular file readiness: that is, we want + // ready-for-read to be returned even at end of file: +#if defined(NOTE_FILE_POLL) + // FreeBSD: + constexpr static int POLL_SEMANTICS = NOTE_FILE_POLL; +#else + // Note that macOS has an "EV_POLL" defined that looks like it should give poll semantics + // when passed as a flag. However, it is filtered at the syscall entry so we cannot use it in + // kqueue. (The kernel uses it internally to implement poll()). + constexpr static int POLL_SEMANTICS = 0; +#endif + + void process_events(struct kevent *events, int r) + { + std::lock_guard guard(Base::lock); + + for (int i = 0; i < r; i++) { + if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) { + int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS; + auto r = Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags); + if (std::get<0>(r) == 0) { + // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for + // another connection). + events[i].flags = EV_DISABLE | EV_CLEAR; + } + else { + events[i].flags = EV_ENABLE; + } + } + else { + events[i].flags = EV_DISABLE; + } + } + + // Now we disable all received events, to simulate EV_DISPATCH: + kevent(kqfd, events, r, nullptr, 0, nullptr); + } + + public: + + /** + * kqueue_loop constructor. + * + * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. + */ + macos_kqueue_loop() + { + kqfd = kqueue(); + if (kqfd == -1) { + throw std::system_error(errno, std::system_category()); + } + Base::init(this); + } + + ~macos_kqueue_loop() + { + close(kqfd); + } + + void set_filter_enabled(short filterType, uintptr_t ident, void *udata, bool enable) + { + // Note, on OpenBSD enabling or disabling filter will not alter the filter parameters (udata etc); + // on OS X however, it will. Therefore we set udata here (to the same value as it was originally + // set) in order to work correctly on both kernels. + struct kevent kev; + int fflags = (filterType == EVFILT_READ) ? POLL_SEMANTICS : 0; + EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, fflags, 0, udata); + kevent(kqfd, &kev, 1, nullptr, 0, nullptr); + } + + void remove_filter(short filterType, uintptr_t ident) + { + struct kevent kev; + EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0); + kevent(kqfd, &kev, 1, nullptr, 0, nullptr); + } + + // fd: file descriptor to watch + // userdata: data to associate with descriptor + // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT + // (only one of IN_EVENTS/OUT_EVENTS can be specified) + // soft_fail: true if unsupported file descriptors should fail by returning false instead + // of throwing an exception + // returns: true on success; false if file descriptor type isn't supported and emulate == true + // throws: std::system_error or std::bad_alloc on failure + bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool emulate = false) + { + short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE; + + if (filter == EVFILT_READ && POLL_SEMANTICS == 0 && emulate) { + // We can't request poll semantics, so check for regular file: + struct stat statbuf; + if (fstat(fd, &statbuf) == -1) { + throw new std::system_error(errno, std::system_category()); + } + if ((statbuf.st_mode & S_IFMT) == S_IFREG) { + // Regular file: emulation required + return false; + } + } + + int fflags = (filter == EVFILT_READ) ? POLL_SEMANTICS : 0; + + struct kevent kev; + EV_SET(&kev, fd, filter, EV_ADD | (enabled ? 0 : EV_DISABLE), fflags, 0, userdata); + if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) { + // Note that kqueue supports EVFILT_READ on regular file fd's, but not EVFILT_WRITE. + if (filter == EVFILT_WRITE && errno == EINVAL && emulate) { + return false; // emulate + } + throw new std::system_error(errno, std::system_category()); + } + return true; + } + + // returns: 0 on success + // IN_EVENTS if in watch requires emulation + // OUT_EVENTS if out watch requires emulation + int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false) + { +#ifdef EV_RECEIPT + struct kevent kev[2]; + struct kevent kev_r[2]; + short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT; + short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT; + EV_SET(&kev[0], fd, EVFILT_READ, rflags, POLL_SEMANTICS, 0, userdata); + EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata); + + int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr); + + if (r == -1) { + throw new std::system_error(errno, std::system_category()); + } + + // Some possibilities: + // - both ends failed. We'll throw an error rather than allowing emulation. + // - read watch failed, write succeeded : should not happen. + // - read watch added, write failed: if emulate == true, succeed; + // if emulate == false, remove read and fail. + + if (kev_r[0].data != 0) { + // read failed + throw new std::system_error(kev_r[0].data, std::system_category()); + } + + if (kev_r[1].data != 0) { + if (emulate) { + // We can emulate, but, do we have correct semantics? + if (POLL_SEMANTICS != 0) { + return OUT_EVENTS; + } + + // if we can't get poll semantics, emulate for read as well: + // first remove read watch: + EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata); + kevent(kqfd, kev, 1, nullptr, 0, nullptr); + return IN_EVENTS | OUT_EVENTS; + } + // remove read watch + EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata); + kevent(kqfd, kev, 1, nullptr, 0, nullptr); + // throw exception + throw new std::system_error(kev_r[1].data, std::system_category()); + } + + return 0; +#else + // OpenBSD doesn't have EV_RECEIPT: install the watches one at a time + struct kevent kev[1]; + + short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE); + short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE); + EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata); + + int r = kevent(kqfd, kev, 1, nullptr, 0, nullptr); + + if (r == -1) { + throw new std::system_error(errno, std::system_category()); + } + + EV_SET(&kev[0], fd, EVFILT_WRITE, wflags, 0, 0, userdata); + + r = kevent(kqfd, kev, 1, nullptr, 0, nullptr); + + if (r == -1) { + if (emulate) { + return OUT_EVENTS; + } + // remove read watch + EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata); + kevent(kqfd, kev, 1, nullptr, 0, nullptr); + // throw exception + throw new std::system_error(errno, std::system_category()); + } + + return 0; +#endif + } + + // flags specifies which watch to remove; ignored if the loop doesn't support + // separate read/write watches. + void remove_fd_watch(int fd, int flags) + { + remove_filter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd); + } + + void remove_fd_watch_nolock(int fd, int flags) + { + remove_fd_watch(fd, flags); + } + + void remove_bidi_fd_watch(int fd) noexcept + { + struct kevent kev[2]; + EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + EV_SET(&kev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr); + + kevent(kqfd, kev, 2, nullptr, 0, nullptr); + } + + void enable_fd_watch(int fd, void *userdata, int flags) + { + set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, userdata, true); + } + + void enable_fd_watch_nolock(int fd, void *userdata, int flags) + { + enable_fd_watch(fd, userdata, flags); + } + + void disable_fd_watch(int fd, int flags) + { + set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, nullptr, false); + } + + void disable_fd_watch_nolock(int fd, int flags) + { + disable_fd_watch(fd, flags); + } + + // If events are pending, process an unspecified number of them. + // If no events are pending, wait until one event is received and + // process this event (and possibly any other events received + // simultaneously). + // If processing an event removes a watch, there is a possibility + // that the watched event will still be reported (if it has + // occurred) before pull_events() returns. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pull_events(bool do_wait) + { + struct kevent events[16]; + struct timespec ts; + + Base::lock.lock(); + sigset_t sigmask = this->get_active_sigmask(); + Base::lock.unlock(); + + volatile bool was_signalled = false; + + // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is + // received during polling, it will longjmp back to here: + if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) { + this->process_signal(sigmask); + was_signalled = true; + } + + if (was_signalled) { + do_wait = false; + } + + ts.tv_sec = 0; + ts.tv_nsec = 0; + + std::atomic_signal_fence(std::memory_order_release); + this->sigmaskf(SIG_UNBLOCK, &sigmask, nullptr); + int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts); + this->sigmaskf(SIG_BLOCK, &sigmask, nullptr); + if (r == -1 || r == 0) { + // signal or no events + return; + } + + do { + process_events(events, r); + r = kevent(kqfd, nullptr, 0, events, 16, &ts); + } while (r > 0); + } +}; + +} // end namespace diff --git a/src/dasynq/dasynq-select.h b/src/dasynq/dasynq-select.h index a10e81a..29a1ae3 100644 --- a/src/dasynq/dasynq-select.h +++ b/src/dasynq/dasynq-select.h @@ -13,6 +13,7 @@ #include #include "dasynq-config.h" +#include "dasynq-signal.h" // "pselect"-based event loop mechanism. // @@ -21,40 +22,10 @@ namespace dasynq { template class select_events; -class select_traits +class select_traits : public signal_traits { public: - class sigdata_t - { - template friend class select_events; - - siginfo_t info; - - public: - // mandatory: - int get_signo() { return info.si_signo; } - int get_sicode() { return info.si_code; } - pid_t get_sipid() { return info.si_pid; } - uid_t get_siuid() { return info.si_uid; } - void * get_siaddr() { return info.si_addr; } - int get_sistatus() { return info.si_status; } - int get_sival_int() { return info.si_value.sival_int; } - void * get_sival_ptr() { return info.si_value.sival_ptr; } - - // XSI - int get_sierrno() { return info.si_errno; } - - // XSR (streams) OB (obselete) -#if !defined(__OpenBSD__) - // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS - // interface. - int get_siband() { return info.si_band; } -#endif - - void set_signo(int signo) { info.si_signo = signo; } - }; - class fd_r; // File descriptor optional storage. If the mechanism can return the file descriptor, this @@ -89,66 +60,13 @@ class select_traits constexpr static bool supports_non_oneshot_fd = false; }; -namespace dprivate { -namespace select_mech { - -// We need to declare and define a non-static data variable, "siginfo_p", in this header, without -// violating the "one definition rule". The only way to do that is via a template, even though we -// don't otherwise need a template here: -template class sig_capture_templ -{ - public: - static siginfo_t siginfo_cap; - static sigjmp_buf rjmpbuf; - - static void signal_handler(int signo, siginfo_t *siginfo, void *v) - { - siginfo_cap = *siginfo; - siglongjmp(rjmpbuf, 1); - } -}; -template siginfo_t sig_capture_templ::siginfo_cap; -template sigjmp_buf sig_capture_templ::rjmpbuf; - -using sig_capture = sig_capture_templ<>; - -inline void prepare_signal(int signo) -{ - struct sigaction the_action; - the_action.sa_sigaction = sig_capture::signal_handler; - the_action.sa_flags = SA_SIGINFO; - sigfillset(&the_action.sa_mask); - - sigaction(signo, &the_action, nullptr); -} - -inline sigjmp_buf &get_sigreceive_jmpbuf() -{ - return sig_capture::rjmpbuf; -} - -inline void unprep_signal(int signo) -{ - signal(signo, SIG_DFL); -} - -inline siginfo_t * get_siginfo() -{ - return &sig_capture::siginfo_cap; -} - -} } // namespace dasynq :: select_mech - -template class select_events : public Base +template class select_events : public signal_events { fd_set read_set; fd_set write_set; //fd_set error_set; // logical OR of both the above int max_fd = 0; // highest fd in any of the sets - sigset_t active_sigmask; // mask out unwatched signals i.e. active=0 - void * sig_userdata[NSIG]; - // userdata pointers in read and write respectively, for each fd: std::vector rd_udata; std::vector wr_udata; @@ -159,7 +77,6 @@ template class select_events : public Base // receive_signal(sigdata_t &, user *) noexcept // receive_fd_event(fd_r, user *, int flags) noexcept - using sigdata_t = select_traits::sigdata_t; using fd_r = typename select_traits::fd_r; void process_events(fd_set *read_set_p, fd_set *write_set_p, fd_set *error_set_p) @@ -204,7 +121,6 @@ template class select_events : public Base { FD_ZERO(&read_set); FD_ZERO(&write_set); - sigfillset(&active_sigmask); Base::init(this); } @@ -333,44 +249,6 @@ template class select_events : public Base disable_fd_watch_nolock(fd, flags); } - // Note signal should be masked before call. - void add_signal_watch(int signo, void *userdata) - { - std::lock_guard guard(Base::lock); - add_signal_watch_nolock(signo, userdata); - } - - // Note signal should be masked before call. - void add_signal_watch_nolock(int signo, void *userdata) - { - sig_userdata[signo] = userdata; - sigdelset(&active_sigmask, signo); - dprivate::select_mech::prepare_signal(signo); - } - - // Note, called with lock held: - void rearm_signal_watch_nolock(int signo, void *userdata) noexcept - { - sig_userdata[signo] = userdata; - sigdelset(&active_sigmask, signo); - } - - void remove_signal_watch_nolock(int signo) noexcept - { - dprivate::select_mech::unprep_signal(signo); - sigaddset(&active_sigmask, signo); - sig_userdata[signo] = nullptr; - // No need to signal other threads - } - - void remove_signal_watch(int signo) noexcept - { - std::lock_guard guard(Base::lock); - remove_signal_watch_nolock(signo); - } - - public: - // If events are pending, process an unspecified number of them. // If no events are pending, wait until one event is received and // process this event (and possibly any other events received @@ -383,7 +261,7 @@ template class select_events : public Base // pending. void pull_events(bool do_wait) noexcept { - using namespace dprivate::select_mech; + //using namespace dprivate::select_mech; struct timespec ts; ts.tv_sec = 0; @@ -398,6 +276,8 @@ template class select_events : public Base write_set_c = write_set; err_set = read_set; + const sigset_t &active_sigmask = this->get_active_sigmask(); + sigset_t sigmask; this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask); // This is horrible, but hopefully will be optimised well. POSIX gives no way to combine signal @@ -414,18 +294,8 @@ template class select_events : public Base // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is // received during polling, it will longjmp back to here: - if (sigsetjmp(get_sigreceive_jmpbuf(), 1) != 0) { - std::atomic_signal_fence(std::memory_order_acquire); - auto * sinfo = get_siginfo(); - sigdata_t sigdata; - sigdata.info = *sinfo; - Base::lock.lock(); - void *udata = sig_userdata[sinfo->si_signo]; - if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) { - sigaddset(&sigmask, sinfo->si_signo); - sigaddset(&active_sigmask, sinfo->si_signo); - } - Base::lock.unlock(); + if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) { + this->process_signal(sigmask); was_signalled = true; } diff --git a/src/dasynq/dasynq-signal.h b/src/dasynq/dasynq-signal.h new file mode 100644 index 0000000..dc9a840 --- /dev/null +++ b/src/dasynq/dasynq-signal.h @@ -0,0 +1,215 @@ +#include + +#include +#include + +// Support for the standard POSIX signal mechanisms. This can be used by backends that don't +// otherwise support receiving signals. It is not particularly nice (it involves using longjmp +// out of a signal handler, which POSIX mildly frowns upon) but it's really the only viable way +// to process signals together with file descriptor / other events and obtain the full siginfo_t +// data passed to the signal handler. + +namespace dasynq { + +class signal_traits +{ + public: + + class sigdata_t + { + template friend class signal_events; + + siginfo_t info; + + public: + // mandatory: + int get_signo() { return info.si_signo; } + int get_sicode() { return info.si_code; } + pid_t get_sipid() { return info.si_pid; } + uid_t get_siuid() { return info.si_uid; } + void * get_siaddr() { return info.si_addr; } + int get_sistatus() { return info.si_status; } + int get_sival_int() { return info.si_value.sival_int; } + void * get_sival_ptr() { return info.si_value.sival_ptr; } + + // XSI + int get_sierrno() { return info.si_errno; } + + // XSR (streams) OB (obselete) +#if !defined(__OpenBSD__) + // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS + // interface. + int get_siband() { return info.si_band; } +#endif + + void set_signo(int signo) { info.si_signo = signo; } + }; + + constexpr static bool interrupt_after_signal_add = true; +}; + +namespace dprivate { +namespace signal_mech { + +// We need to declare and define a non-static data variable, "siginfo_p", in this header, without +// violating the "one definition rule". The only way to do that is via a template, even though we +// don't otherwise need a template here: +template class sig_capture_templ +{ + public: + static siginfo_t siginfo_cap; + static sigjmp_buf rjmpbuf; + + static void signal_handler(int signo, siginfo_t *siginfo, void *v) + { + siginfo_cap = *siginfo; + siglongjmp(rjmpbuf, 1); + } +}; +template siginfo_t sig_capture_templ::siginfo_cap; +template sigjmp_buf sig_capture_templ::rjmpbuf; + +using sig_capture = sig_capture_templ<>; + +inline void prepare_signal(int signo) +{ + struct sigaction the_action; + the_action.sa_sigaction = sig_capture::signal_handler; + the_action.sa_flags = SA_SIGINFO; + sigfillset(&the_action.sa_mask); + + sigaction(signo, &the_action, nullptr); +} + +inline sigjmp_buf &get_sigreceive_jmpbuf() +{ + return sig_capture::rjmpbuf; +} + +inline void unprep_signal(int signo) +{ + signal(signo, SIG_DFL); +} + +inline siginfo_t * get_siginfo() +{ + return &sig_capture::siginfo_cap; +} + +} } // namespace dasynq :: signal_mech + +// signal_events template. +// +// Active (watched and enabled) signals are maintained as a signal mask, which either has active +// signals in the mask or inactive signals in the mask, depending on the mask_enables parameter. +// (if mask_enables is true, active signals are in the mask). Which is more convenient depends +// exactly on how the mask will be used. +// +template class signal_events : public Base +{ + sigset_t active_sigmask; // mask out unwatched signals i.e. active=0 + void * sig_userdata[NSIG]; + + using sigdata_t = signal_traits::sigdata_t; + + protected: + + signal_events() + { + if (mask_enables) { + sigemptyset(&active_sigmask); + } + else { + sigfillset(&active_sigmask); + } + } + + const sigset_t &get_active_sigmask() + { + return active_sigmask; + } + + sigjmp_buf &get_sigreceive_jmpbuf() + { + return dprivate::signal_mech::get_sigreceive_jmpbuf(); + } + + void process_signal(sigset_t &sigmask) + { + using namespace dprivate::signal_mech; + std::atomic_signal_fence(std::memory_order_acquire); + auto * sinfo = get_siginfo(); + sigdata_t sigdata; + sigdata.info = *sinfo; + + Base::lock.lock(); + void *udata = sig_userdata[sinfo->si_signo]; + if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) { + if (mask_enables) { + sigdelset(&sigmask, sinfo->si_signo); + sigdelset(&active_sigmask, sinfo->si_signo); + } + else { + sigaddset(&sigmask, sinfo->si_signo); + sigaddset(&active_sigmask, sinfo->si_signo); + } + } + Base::lock.unlock(); + } + + public: + + // Note signal should be masked before call. + void add_signal_watch(int signo, void *userdata) + { + std::lock_guard guard(Base::lock); + add_signal_watch_nolock(signo, userdata); + } + + // Note signal should be masked before call. + void add_signal_watch_nolock(int signo, void *userdata) + { + sig_userdata[signo] = userdata; + if (mask_enables) { + sigaddset(&active_sigmask, signo); + } + else { + sigdelset(&active_sigmask, signo); + } + dprivate::signal_mech::prepare_signal(signo); + } + + // Note, called with lock held: + void rearm_signal_watch_nolock(int signo, void *userdata) noexcept + { + sig_userdata[signo] = userdata; + if (mask_enables) { + sigaddset(&active_sigmask, signo); + } + else { + sigdelset(&active_sigmask, signo); + } + } + + void remove_signal_watch_nolock(int signo) noexcept + { + dprivate::signal_mech::unprep_signal(signo); + if (mask_enables) { + sigdelset(&active_sigmask, signo); + } + else { + sigaddset(&active_sigmask, signo); + } + sig_userdata[signo] = nullptr; + // No need to signal other threads + } + + void remove_signal_watch(int signo) noexcept + { + std::lock_guard guard(Base::lock); + remove_signal_watch_nolock(signo); + } + +}; + +} diff --git a/src/dasynq/dasynq.h b/src/dasynq/dasynq.h index 68175c4..b831740 100644 --- a/src/dasynq/dasynq.h +++ b/src/dasynq/dasynq.h @@ -8,21 +8,24 @@ #include "dasynq-interrupt.h" #include "dasynq-util.h" -// Dasynq uses a "mix-in" pattern to produce an event loop implementation incorporating selectable implementations of -// various components (main backend, timers, child process watch mechanism etc). In C++ this can be achieved by -// a template for some component which extends its own type parameter: +// Dasynq uses a "mix-in" pattern to produce an event loop implementation incorporating selectable +// implementations of various components (main backend, timers, child process watch mechanism etc). In C++ +// this can be achieved by a template for some component which extends its own type parameter: // // template class X : public B { .... } // -// We can chain several such components together (and so so below) to "mix in" the functionality of each into the final -// class, eg: +// (Note that in a sense this is actually the opposite of the so-called "Curiously Recurring Template" +// pattern, which can be used to achieve a similar goal). We can chain several such components together to +// "mix in" the functionality of each into the final class, eg: // -// template using Loop = epoll_loop>>>; +// template using loop_t = +// epoll_loop>>>; // -// (which defines an alias template "Loop", whose implementation will use the epoll backend, a standard interrupt channel -// implementation, a timerfd-based timer implementation, and the standard child process watch implementation). -// We sometimes need the base class to be able to call derived-class members: to do this we pass a reference to -// the derived instance into a templated method in the base, called "init": +// (which defines an alias template "loop_t", whose implementation will use the epoll backend, a standard +// interrupt channel implementation, a timerfd-based timer implementation, and the standard child process +// watch implementation). We sometimes need the base class to be able to call derived-class members: to do +// this we pass a reference to the derived instance into a template member function in the base, for example +// the "init" function: // // template void init(T *derived) // { @@ -32,13 +35,52 @@ // Base::init(derived); // } // -// At the base all this is the event_dispatch class, defined below, which receives event notifications and inserts -// them into a queue for processing. The event_loop class, also below, wraps this (via composition) in an interface -// which can be used to register/de-regsiter/enable/disable event watchers, and which can process the queued events -// by calling the watcher callbacks. The event_loop class also provides some synchronisation to ensure thread-safety. - -#if DASYNQ_HAVE_KQUEUE -#include "dasynq-kqueue.h" +// The 'loop_t' defined above is a template for a usable backend mechanism for the event_loop template +// class. At the base all this is the event_dispatch class, defined below, which receives event +// notifications and inserts them into a queue for processing. The event_loop class, also below, wraps this +// (via composition) in an interface which can be used to register/de-register/enable/disable event +// watchers, and which can process the queued events by calling the watcher callbacks. The event_loop class +// also provides some synchronisation to ensure thread-safety, and abstracts away some differences between +// backends. +// +// The differences are exposed as traits, partly via a separate traits class (loop_traits_t as defined +// below, which contains the "main" traits, particularly the sigdata_t, fd_r and fd_s types). Note that the +// event_dispatch class exposes the loop traits as traits_t, and these are then potentially augmented at +// each stage of the mechanism inheritance chain (i.e. the final traits are exposed as +// `loop_t::traits_t'. +// +// The trait members are: +// sigdata_t - a wrapper for the siginfo_t type or equivalent used to pass signal parameters +// fd_r - a file descriptor wrapper, if the backend is able to retrieve the file descriptor when +// it receives an fd event. Not all backends can do this. +// fd_s - a file descriptor storage wrapper. If the backend can retrieve file descriptors, this +// will be empty (and ideally zero-size), otherwise it stores a file descriptor. +// With an fd_r and fd_s instance you can always retrieve the file descriptor: +// `fdr.get_fd(fds)' will return it. +// has_bidi_fd_watch +// - boolean indicating whether a single watch can support watching for both input and output +// events simultaneously +// has_separate_rw_fd_watches +// - boolean indicating whether it is possible to add separate input and output watches for the +// same fd. Either this or has_bidi_fd_watch must be true. +// interrupt_after_fd_add +// - boolean indicating if a loop interrupt must be forced after adding/enabling an fd watch. +// interrupt_after_signal_add +// - boolean indicating if a loop interrupt must be forced after adding or enabling a signal +// watch. +// supports_non_oneshot_fd +// - boolean; if true, event_dispatch can arm an fd watch without ONESHOT and returning zero +// events from receive_fd_event (the event notification function) will leave the descriptor +// armed. If false, all fd watches are effectively ONESHOT (they can be re-armed immediately +// after delivery by returning an appropriate event flag mask). +// full_timer_support +// - boolean indicating that the monotonic and system clocks are actually different clocks and +// that timers against the system clock will work correctly if the system clock time is +// adjusted. If false, the monotic clock may not be present at all (monotonic clock will map +// to system clock), and timers against either clock are not guaranteed to work correctly if +// the system clock is adjusted. + +#if DASYNQ_HAVE_EPOLL <= 0 #if _POSIX_TIMERS > 0 #include "dasynq-posixtimer.h" namespace dasynq { @@ -50,11 +92,24 @@ namespace dasynq { template using timer_events = itimer_events; } #endif +#endif + +#if DASYNQ_HAVE_KQUEUE +#if DASYNQ_KQUEUE_MACOS_WORKAROUND +#include "dasynq-kqueue-macos.h" +#include "dasynq-childproc.h" +namespace dasynq { + template using loop_t = macos_kqueue_loop>>>; + using loop_traits_t = macos_kqueue_traits; +} +#else +#include "dasynq-kqueue.h" #include "dasynq-childproc.h" namespace dasynq { template using loop_t = kqueue_loop>>>; using loop_traits_t = kqueue_traits; } +#endif #elif DASYNQ_HAVE_EPOLL #include "dasynq-epoll.h" #include "dasynq-timerfd.h" @@ -143,6 +198,7 @@ namespace dprivate { typedef std::condition_variable_any condvar; }; + // For a single-threaded loop, the waitqueue is a no-op: template <> class waitqueue_node { // Specialised waitqueue_node for null_mutex. @@ -248,21 +304,72 @@ namespace dprivate { } }; + // friend of event_loop for giving access to various private members + class loop_access { + public: + template + static typename Loop::mutex_t &get_base_lock(Loop &loop) noexcept + { + return loop.get_base_lock(); + } + + template + static rearm process_fd_rearm(Loop &loop, typename Loop::base_fd_watcher *bfw, + rearm rearm_type, bool is_multi_watch) noexcept + { + return loop.process_fd_rearm(bfw, rearm_type, is_multi_watch); + } + + template + static rearm process_secondary_rearm(Loop &loop, typename Loop::base_bidi_fd_watcher * bdfw, + base_watcher * outw, rearm rearm_type) noexcept + { + return loop.process_secondary_rearm(bdfw, outw, rearm_type); + } + + template + static void process_signal_rearm(Loop &loop, typename Loop::base_signal_watcher * bsw, + rearm rearm_type) noexcept + { + loop.process_signal_rearm(bsw, rearm_type); + } + + template + static void process_child_watch_rearm(Loop &loop, typename Loop::base_child_watcher *bcw, + rearm rearm_type) noexcept + { + loop.process_child_watch_rearm(bcw, rearm_type); + } + + template + static void process_timer_rearm(Loop &loop, typename Loop::base_timer_watcher *btw, + rearm rearm_type) noexcept + { + loop.process_timer_rearm(btw, rearm_type); + } + + template + static void requeue_watcher(Loop &loop, base_watcher *watcher) noexcept + { + loop.requeue_watcher(watcher); + } + }; + // Do standard post-dispatch processing for a watcher. This handles the case of removing or - // re-queing watchers depending on the rearm type. + // re-queueing watchers depending on the rearm type. template void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearm_type) { if (rearm_type == rearm::REMOVE) { - loop.get_base_lock().unlock(); + loop_access::get_base_lock(loop).unlock(); watcher->watch_removed(); - loop.get_base_lock().lock(); + loop_access::get_base_lock(loop).lock(); } else if (rearm_type == rearm::REQUEUE) { - loop.requeue_watcher(watcher); + loop_access::requeue_watcher(loop, watcher); } } - // This class serves as the base class (mixin) for the AEN mechanism class. + // This class serves as the base class (mixin) for the backend mechanism. // // The event_dispatch class maintains the queued event data structures. It inserts watchers // into the queue when events are received (receiveXXX methods). It also owns a mutex used @@ -271,12 +378,12 @@ namespace dprivate { // In general the methods should be called with lock held. In practice this means that the // event loop backend implementations must obtain the lock; they are also free to use it to // protect their own internal data structures. - template class event_dispatch + template class event_dispatch { - friend class dasynq::event_loop;; + friend class dasynq::event_loop;; public: - using mutex_t = T_Mutex; + using mutex_t = typename LoopTraits::mutex_t; using traits_t = Traits; private: @@ -284,11 +391,9 @@ namespace dprivate { // queue data structure/pointer prio_queue event_queue; - using base_signal_watcher = dasynq::dprivate::base_signal_watcher; - using base_fd_watcher = dasynq::dprivate::base_fd_watcher; - using base_bidi_fd_watcher = dasynq::dprivate::base_bidi_fd_watcher; - using base_child_watcher = dasynq::dprivate::base_child_watcher; - using base_timer_watcher = dasynq::dprivate::base_timer_watcher; + using base_signal_watcher = dprivate::base_signal_watcher; + using base_child_watcher = dprivate::base_child_watcher; + using base_timer_watcher = dprivate::base_timer_watcher; // Add a watcher into the queueing system (but don't queue it). Call with lock held. // may throw: std::bad_alloc @@ -439,7 +544,7 @@ namespace dprivate { } } - // Queue a watcher for reomval, or issue "removed" callback to it. + // Queue a watcher for removal, or issue "removed" callback to it. // Call with lock free. void issue_delete(base_bidi_fd_watcher *watcher) noexcept { @@ -478,34 +583,39 @@ namespace dprivate { event_dispatch() { } event_dispatch(const event_dispatch &) = delete; }; + } -// This is the main event_loop implementation. It serves as an interface to the event loop -// backend (of which it maintains an internal instance). It also serialises waits the backend -// and provides safe deletion of watchers (see comments inline). +// This is the main event_loop implementation. It serves as an interface to the event loop backend (of which +// it maintains an internal instance). It also serialises polling the backend and provides safe deletion of +// watchers (see comments inline). +// +// The T_Mutex type parameter specifies the mutex type. A null_mutex can be used for a single-threaded event +// loop; std::mutex, or any mutex providing a compatible interface, can be used for a thread-safe event +// loop. +// +// The Traits type parameter specifies any required traits for the event loop. This specifies the back-end +// to use (backend_t, a template) and the basic back-end traits (backend_traits_t). +// The default is `default_traits'. +// template class event_loop { using my_event_loop_t = event_loop; + friend class dprivate::fd_watcher; friend class dprivate::bidi_fd_watcher; friend class dprivate::signal_watcher; friend class dprivate::child_proc_watcher; friend class dprivate::timer; - friend void dprivate::post_dispatch(my_event_loop_t &loop, - dprivate::base_watcher *watcher, rearm rearm_type); - - template friend class dprivate::fd_watcher_impl; - template friend class dprivate::bidi_fd_watcher_impl; - template friend class dprivate::signal_watcher_impl; - template friend class dprivate::child_proc_watcher_impl; - template friend class dprivate::timer_impl; + friend class dprivate::loop_access; using backend_traits_t = typename Traits::backend_traits_t; - template using event_dispatch = dprivate::event_dispatch; - using loop_mech_t = typename Traits::template backend_t>; + template using event_dispatch = dprivate::event_dispatch; + using dispatch_t = event_dispatch; + using loop_mech_t = typename Traits::template backend_t; using reaper_mutex_t = typename loop_mech_t::reaper_mutex_t; public: @@ -517,11 +627,11 @@ class event_loop template using waitqueue = dprivate::waitqueue; template using waitqueue_node = dprivate::waitqueue_node; using base_watcher = dprivate::base_watcher; - using base_signal_watcher = dprivate::base_signal_watcher; - using base_fd_watcher = dprivate::base_fd_watcher; - using base_bidi_fd_watcher = dprivate::base_bidi_fd_watcher; - using base_child_watcher = dprivate::base_child_watcher; - using base_timer_watcher = dprivate::base_timer_watcher; + using base_signal_watcher = dprivate::base_signal_watcher; + using base_fd_watcher = dprivate::base_fd_watcher; + using base_bidi_fd_watcher = dprivate::base_bidi_fd_watcher; + using base_child_watcher = dprivate::base_child_watcher; + using base_timer_watcher = dprivate::base_timer_watcher; using watch_type_t = dprivate::watch_type_t; loop_mech_t loop_mech; @@ -585,8 +695,7 @@ class event_loop void register_signal(base_signal_watcher *callBack, int signo) { - auto & ed = (event_dispatch &) loop_mech; - std::lock_guard guard(ed.lock); + std::lock_guard guard(loop_mech.lock); loop_mech.prepare_watcher(callBack); try { @@ -608,16 +717,14 @@ class event_loop waitqueue_node qnode; get_attn_lock(qnode); - auto & ed = (event_dispatch &) loop_mech; - ed.issue_delete(callBack); + loop_mech.issue_delete(callBack); release_lock(qnode); } void register_fd(base_fd_watcher *callback, int fd, int eventmask, bool enabled, bool emulate = false) { - auto & ed = (event_dispatch &) loop_mech; - std::lock_guard guard(ed.lock); + std::lock_guard guard(loop_mech.lock); loop_mech.prepare_watcher(callback); @@ -642,10 +749,11 @@ class event_loop } } + // Register a bidi fd watcher. The watch_flags should already be set to the eventmask to watch + // (i.e. eventmask == callback->watch_flags is a pre-condition). void register_fd(base_bidi_fd_watcher *callback, int fd, int eventmask, bool emulate = false) { - auto & ed = (event_dispatch &) loop_mech; - std::lock_guard guard(ed.lock); + std::lock_guard guard(loop_mech.lock); loop_mech.prepare_watcher(callback); try { @@ -657,6 +765,7 @@ class event_loop if (r & IN_EVENTS) { callback->emulatefd = true; if (eventmask & IN_EVENTS) { + callback->watch_flags &= ~IN_EVENTS; requeue_watcher(callback); } } @@ -667,6 +776,7 @@ class event_loop if (r & OUT_EVENTS) { callback->out_watcher.emulatefd = true; if (eventmask & OUT_EVENTS) { + callback->watch_flags &= ~OUT_EVENTS; requeue_watcher(&callback->out_watcher); } } @@ -679,9 +789,11 @@ class event_loop callback->emulatefd = true; callback->out_watcher.emulatefd = true; if (eventmask & IN_EVENTS) { + callback->watch_flags &= ~IN_EVENTS; requeue_watcher(callback); } if (eventmask & OUT_EVENTS) { + callback->watch_flags &= ~OUT_EVENTS; requeue_watcher(&callback->out_watcher); } } @@ -734,7 +846,7 @@ class event_loop void deregister(base_fd_watcher *callback, int fd) noexcept { if (callback->emulatefd) { - auto & ed = (event_dispatch &) loop_mech; + auto & ed = (dispatch_t &) loop_mech; ed.issue_delete(callback); return; } @@ -744,7 +856,7 @@ class event_loop waitqueue_node qnode; get_attn_lock(qnode); - auto & ed = (event_dispatch &) loop_mech; + auto & ed = (dispatch_t &) loop_mech; ed.issue_delete(callback); release_lock(qnode); @@ -762,7 +874,7 @@ class event_loop waitqueue_node qnode; get_attn_lock(qnode); - event_dispatch & ed = (event_dispatch &) loop_mech; + dispatch_t & ed = (dispatch_t &) loop_mech; ed.issue_delete(callback); release_lock(qnode); @@ -770,8 +882,7 @@ class event_loop void reserve_child_watch(base_child_watcher *callback) { - auto & ed = (event_dispatch &) loop_mech; - std::lock_guard guard(ed.lock); + std::lock_guard guard(loop_mech.lock); loop_mech.prepare_watcher(callback); try { @@ -785,8 +896,7 @@ class event_loop void unreserve(base_child_watcher *callback) noexcept { - auto & ed = (event_dispatch &) loop_mech; - std::lock_guard guard(ed.lock); + std::lock_guard guard(loop_mech.lock); loop_mech.unreserve_child_watch(callback->watch_handle); loop_mech.release_watcher(callback); @@ -794,8 +904,7 @@ class event_loop void register_child(base_child_watcher *callback, pid_t child) { - auto & ed = (event_dispatch &) loop_mech; - std::lock_guard guard(ed.lock); + std::lock_guard guard(loop_mech.lock); loop_mech.prepare_watcher(callback); try { @@ -824,8 +933,7 @@ class event_loop waitqueue_node qnode; get_attn_lock(qnode); - event_dispatch & ed = (event_dispatch &) loop_mech; - ed.issue_delete(callback); + loop_mech.issue_delete(callback); release_lock(qnode); } @@ -839,8 +947,7 @@ class event_loop void register_timer(base_timer_watcher *callback, clock_type clock) { - auto & ed = (event_dispatch &) loop_mech; - std::lock_guard guard(ed.lock); + std::lock_guard guard(loop_mech.lock); loop_mech.prepare_watcher(callback); try { @@ -897,8 +1004,7 @@ class event_loop waitqueue_node qnode; get_attn_lock(qnode); - event_dispatch & ed = (event_dispatch &) loop_mech; - ed.issue_delete(callback); + loop_mech.issue_delete(callback); release_lock(qnode); } @@ -911,25 +1017,18 @@ class event_loop void requeue_watcher(base_watcher *watcher) noexcept { loop_mech.queue_watcher(watcher); - - // We need to signal any thread that is currently waiting on the loop mechanism, so that it wakes - // and processes the newly queued watcher: - - wait_lock.lock(); - bool attn_q_empty = attn_waitqueue.is_empty(); - wait_lock.unlock(); - - if (! attn_q_empty) { - loop_mech.interrupt_wait(); - } + interrupt_if_necessary(); } // Interrupt the current poll-waiter, if necessary - that is, if the loop is multi-thread safe, and if // there is currently another thread polling the backend event mechanism. void interrupt_if_necessary() { - std::lock_guard guard(wait_lock); - if (! attn_waitqueue.is_empty()) { // (always false for single-threaded loops) + wait_lock.lock(); + bool attn_q_empty = attn_waitqueue.is_empty(); // (always false for single-threaded loops) + wait_lock.unlock(); + + if (! attn_q_empty) { loop_mech.interrupt_wait(); } } @@ -1055,9 +1154,8 @@ class event_loop } } else if (rearm_type == rearm::REARM) { - bdfw->watch_flags |= IN_EVENTS; - if (! emulatedfd) { + bdfw->watch_flags |= IN_EVENTS; if (! backend_traits_t::has_separate_rw_fd_watches) { int watch_flags = bdfw->watch_flags; set_fd_enabled_nolock(bdfw, bdfw->watch_fd, @@ -1068,12 +1166,14 @@ class event_loop } } else { + bdfw->watch_flags &= ~IN_EVENTS; rearm_type = rearm::REQUEUE; } } else if (rearm_type == rearm::NOOP) { if (bdfw->emulatefd) { if (bdfw->watch_flags & IN_EVENTS) { + bdfw->watch_flags &= ~IN_EVENTS; rearm_type = rearm::REQUEUE; } } @@ -1125,11 +1225,12 @@ class event_loop bdfw->watch_flags &= ~OUT_EVENTS; } else if (rearm_type == rearm::REARM) { - bdfw->watch_flags |= OUT_EVENTS; + bdfw->watch_flags &= ~OUT_EVENTS; rearm_type = rearm::REQUEUE; } else if (rearm_type == rearm::NOOP) { if (bdfw->watch_flags & OUT_EVENTS) { + bdfw->watch_flags &= ~OUT_EVENTS; rearm_type = rearm::REQUEUE; } } @@ -1209,14 +1310,13 @@ class event_loop // no limit. bool process_events(int limit) noexcept { - auto & ed = (event_dispatch &) loop_mech; - ed.lock.lock(); + loop_mech.lock.lock(); if (limit == 0) { return false; } - base_watcher * pqueue = ed.pull_event(); + base_watcher * pqueue = loop_mech.pull_event(); bool active = false; while (pqueue != nullptr) { @@ -1243,7 +1343,7 @@ class event_loop // issue a secondary dispatch: bbfw->dispatch_second(this); - pqueue = ed.pull_event(); + pqueue = loop_mech.pull_event(); continue; } @@ -1252,10 +1352,10 @@ class event_loop limit--; if (limit == 0) break; } - pqueue = ed.pull_event(); + pqueue = loop_mech.pull_event(); } - ed.lock.unlock(); + loop_mech.lock.unlock(); return active; } @@ -1329,7 +1429,7 @@ namespace dprivate { // Posix signal event watcher template -class signal_watcher : private dprivate::base_signal_watcher +class signal_watcher : private dprivate::base_signal_watcher { template friend class signal_watcher_impl; @@ -1396,11 +1496,11 @@ class signal_watcher_impl : public signal_watcher void dispatch(void *loop_ptr) noexcept override { EventLoop &loop = *static_cast(loop_ptr); - loop.get_base_lock().unlock(); + loop_access::get_base_lock(loop).unlock(); auto rearm_type = static_cast(this)->received(loop, this->siginfo.get_signo(), this->siginfo); - loop.get_base_lock().lock(); + loop_access::get_base_lock(loop).lock(); if (rearm_type != rearm::REMOVED) { @@ -1410,7 +1510,7 @@ class signal_watcher_impl : public signal_watcher rearm_type = rearm::REMOVE; } - loop.process_signal_rearm(this, rearm_type); + loop_access::process_signal_rearm(loop, this, rearm_type); post_dispatch(loop, this, rearm_type); } @@ -1419,7 +1519,7 @@ class signal_watcher_impl : public signal_watcher // Posix file descriptor event watcher template -class fd_watcher : private dprivate::base_fd_watcher +class fd_watcher : private dprivate::base_fd_watcher { template friend class fd_watcher_impl; @@ -1494,11 +1594,15 @@ class fd_watcher : private dprivate::base_fd_watcher guard(eloop.get_base_lock()); if (this->emulatefd) { + if (enable && ! this->emulate_enabled) { + loop_access::requeue_watcher(eloop, this); + } this->emulate_enabled = enable; } else { eloop.set_fd_enabled_nolock(this, this->watch_fd, this->watch_flags, enable); } + if (! enable) { eloop.dequeue_watcher(this); } @@ -1549,11 +1653,11 @@ class fd_watcher_impl : public fd_watcher // In case emulating, clear enabled here; REARM or explicit set_enabled will re-enable. this->emulate_enabled = false; - loop.get_base_lock().unlock(); + loop_access::get_base_lock(loop).unlock(); auto rearm_type = static_cast(this)->fd_event(loop, this->watch_fd, this->event_flags); - loop.get_base_lock().lock(); + loop_access::get_base_lock(loop).lock(); if (rearm_type != rearm::REMOVED) { this->event_flags = 0; @@ -1563,7 +1667,7 @@ class fd_watcher_impl : public fd_watcher rearm_type = rearm::REMOVE; } - rearm_type = loop.process_fd_rearm(this, rearm_type, false); + rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, false); post_dispatch(loop, this, rearm_type); } @@ -1575,7 +1679,7 @@ class fd_watcher_impl : public fd_watcher // This watcher type has two event notification methods which can both potentially be // active at the same time. template -class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher +class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher { template friend class bidi_fd_watcher_impl; @@ -1585,6 +1689,7 @@ class bidi_fd_watcher : private dprivate::base_bidi_fd_watcherwatch_flags; if (b) { this->watch_flags |= events; @@ -1605,6 +1710,13 @@ class bidi_fd_watcher : private dprivate::base_bidi_fd_watcherwatch_flags & IO_EVENTS) != 0); } } + else { + // emulation: if enabling a previously disabled watcher, must queue now: + if (b && (orig_flags != this->watch_flags)) { + this->watch_flags = orig_flags; + loop_access::requeue_watcher(eloop, watcher); + } + } if (! b) { eloop.dequeue_watcher(watcher); @@ -1745,11 +1857,11 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher { EventLoop &loop = *static_cast(loop_ptr); this->emulate_enabled = false; - loop.get_base_lock().unlock(); + loop_access::get_base_lock(loop).unlock(); auto rearm_type = static_cast(this)->read_ready(loop, this->watch_fd); - loop.get_base_lock().lock(); + loop_access::get_base_lock(loop).lock(); if (rearm_type != rearm::REMOVED) { this->event_flags &= ~IN_EVENTS; @@ -1759,7 +1871,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher rearm_type = rearm::REMOVE; } - rearm_type = loop.process_fd_rearm(this, rearm_type, true); + rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, true); post_dispatch(loop, this, rearm_type); } @@ -1770,11 +1882,11 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher auto &outwatcher = bidi_fd_watcher::out_watcher; EventLoop &loop = *static_cast(loop_ptr); - loop.get_base_lock().unlock(); + loop_access::get_base_lock(loop).unlock(); auto rearm_type = static_cast(this)->write_ready(loop, this->watch_fd); - loop.get_base_lock().lock(); + loop_access::get_base_lock(loop).lock(); if (rearm_type != rearm::REMOVED) { this->event_flags &= ~OUT_EVENTS; @@ -1784,7 +1896,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher rearm_type = rearm::REMOVE; } - rearm_type = loop.process_secondary_rearm(this, &outwatcher, rearm_type); + rearm_type = loop_access::process_secondary_rearm(loop, this, &outwatcher, rearm_type); if (rearm_type == rearm::REQUEUE) { post_dispatch(loop, &outwatcher, rearm_type); @@ -1798,7 +1910,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher // Child process event watcher template -class child_proc_watcher : private dprivate::base_child_watcher +class child_proc_watcher : private dprivate::base_child_watcher { template friend class child_proc_watcher_impl; @@ -1975,11 +2087,11 @@ class child_proc_watcher_impl : public child_proc_watcher void dispatch(void *loop_ptr) noexcept override { EventLoop &loop = *static_cast(loop_ptr); - loop.get_base_lock().unlock(); + loop_access::get_base_lock(loop).unlock(); auto rearm_type = static_cast(this)->status_change(loop, this->watch_pid, this->child_status); - loop.get_base_lock().lock(); + loop_access::get_base_lock(loop).lock(); if (rearm_type != rearm::REMOVED) { @@ -1989,7 +2101,7 @@ class child_proc_watcher_impl : public child_proc_watcher rearm_type = rearm::REMOVE; } - loop.process_child_watch_rearm(this, rearm_type); + loop_access::process_child_watch_rearm(loop, this, rearm_type); // rearm_type = loop.process??; post_dispatch(loop, this, rearm_type); @@ -1998,10 +2110,10 @@ class child_proc_watcher_impl : public child_proc_watcher }; template -class timer : private base_timer_watcher +class timer : private base_timer_watcher { template friend class timer_impl; - using base_t = base_timer_watcher; + using base_t = base_timer_watcher; using mutex_t = typename EventLoop::mutex_t; public: @@ -2106,13 +2218,13 @@ class timer_impl : public timer void dispatch(void *loop_ptr) noexcept override { EventLoop &loop = *static_cast(loop_ptr); - loop.get_base_lock().unlock(); + loop_access::get_base_lock(loop).unlock(); auto intervals_report = this->intervals; this->intervals = 0; auto rearm_type = static_cast(this)->timer_expiry(loop, intervals_report); - loop.get_base_lock().lock(); + loop_access::get_base_lock(loop).lock(); if (rearm_type != rearm::REMOVED) { @@ -2122,7 +2234,7 @@ class timer_impl : public timer rearm_type = rearm::REMOVE; } - loop.process_timer_rearm(this, rearm_type); + loop_access::process_timer_rearm(loop, this, rearm_type); post_dispatch(loop, this, rearm_type); } -- 2.25.1