Upgrade bundled Dasynq to 1.1.2.
authorDavin McCall <davmac@davmac.org>
Fri, 9 Feb 2018 22:04:07 +0000 (22:04 +0000)
committerDavin McCall <davmac@davmac.org>
Fri, 9 Feb 2018 22:04:07 +0000 (22:04 +0000)
src/dasynq/dasynq-basewatchers.h
src/dasynq/dasynq-childproc.h
src/dasynq/dasynq-config.h
src/dasynq/dasynq-itimer.h
src/dasynq/dasynq-kqueue-macos.h [new file with mode: 0644]
src/dasynq/dasynq-select.h
src/dasynq/dasynq-signal.h [new file with mode: 0644]
src/dasynq/dasynq.h

index 67144fba9cf64e02ddf014f493fad8c200314bed..fc5f6f57189b8276e77b98aad01779999528ce18 100644 (file)
@@ -75,7 +75,7 @@ namespace dprivate {
         TIMER
     };
 
-    template <typename T_Mutex, typename Traits, typename LoopTraits> class event_dispatch;
+    template <typename Traits, typename LoopTraits> class event_dispatch;
 
     // For FD watchers:
     // Use this watch flag to indicate that in and out events should be reported separately,
@@ -139,10 +139,10 @@ namespace dprivate {
     };
 
     // Base signal event - not part of public API
-    template <typename T_Mutex, typename T_Sigdata>
+    template <typename T_Sigdata>
     class base_signal_watcher : public base_watcher
     {
-        template <typename, typename, typename> friend class event_dispatch;
+        template <typename, typename> friend class event_dispatch;
         template <typename, typename> friend class dasynq::event_loop;
 
         protected:
@@ -154,10 +154,9 @@ namespace dprivate {
         typedef siginfo_t &siginfo_p;
     };
 
-    template <typename T_Mutex>
     class base_fd_watcher : public base_watcher
     {
-        template <typename, typename, typename> friend class event_dispatch;
+        template <typename, typename> friend class event_dispatch;
         template <typename, typename> friend class dasynq::event_loop;
 
         protected:
@@ -175,10 +174,9 @@ namespace dprivate {
         base_fd_watcher() noexcept : base_watcher(watch_type_t::FD) { }
     };
 
-    template <typename T_Mutex>
-    class base_bidi_fd_watcher : public base_fd_watcher<T_Mutex>
+    class base_bidi_fd_watcher : public base_fd_watcher
     {
-        template <typename, typename, typename> friend class event_dispatch;
+        template <typename, typename> friend class event_dispatch;
         template <typename, typename> friend class dasynq::event_loop;
 
         base_bidi_fd_watcher(const base_bidi_fd_watcher &) = delete;
@@ -195,10 +193,9 @@ namespace dprivate {
         int write_removed : 1; // write watch removed?
     };
 
-    template <typename T_Mutex>
     class base_child_watcher : public base_watcher
     {
-        template <typename, typename, typename> friend class event_dispatch;
+        template <typename, typename> friend class event_dispatch;
         template <typename, typename> friend class dasynq::event_loop;
 
         protected:
@@ -210,10 +207,9 @@ namespace dprivate {
     };
 
 
-    template <typename T_Mutex>
     class base_timer_watcher : public base_watcher
     {
-        template <typename, typename, typename> friend class event_dispatch;
+        template <typename, typename> friend class event_dispatch;
         template <typename, typename> friend class dasynq::event_loop;
 
         protected:
index c2da99c89d50b679ddf685bd6462c17247111e58..903ea398abefa34d263ec29969b399595e237965 100644 (file)
@@ -1,4 +1,8 @@
+#include <sys/types.h>
+#include <sys/wait.h>
+
 #include <signal.h>
+
 #include "dasynq-btree_set.h"
 
 namespace dasynq {
index cea00c3e1fc2f86dae4cd73f4cd26e8549398d0a..b38b387e028d58e96401cd96149b16fd39902d11 100644 (file)
 #if ! defined(DASYNQ_HAVE_KQUEUE)
 #if defined(__OpenBSD__) || defined(__APPLE__) || defined(__FreeBSD__)
 #define DASYNQ_HAVE_KQUEUE 1
+#if defined(__APPLE__)
+// kqueue on macos has "issues". See extra/macos-kqueue-bug. There is an alternate Dasyqn kqueue backend
+// which avoids the issue, which is enabled via DASYNQ_KQUEUE_MACOS_WORKAROUND.
+#define DASYNQ_KQUEUE_MACOS_WORKAROUND 1
+#endif
 #endif
 #endif
 
index c4d42a17b36846546d72734a86ae28ea23f86a41..9ecf82831426f19e626db20f9e4b19d7056663ef 100644 (file)
@@ -1,6 +1,7 @@
 #include <vector>
 #include <utility>
 
+#include <signal.h>
 #include <sys/time.h>
 #include <time.h>
 
diff --git a/src/dasynq/dasynq-kqueue-macos.h b/src/dasynq/dasynq-kqueue-macos.h
new file mode 100644 (file)
index 0000000..844a5f5
--- /dev/null
@@ -0,0 +1,441 @@
+#include <system_error>
+#include <type_traits>
+#include <unordered_map>
+#include <vector>
+#include <tuple>
+
+#include <sys/event.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <sys/stat.h>
+
+#include <unistd.h>
+#include <signal.h>
+
+#include "dasynq-config.h"
+#include "dasynq-signal.h"
+
+// "kqueue"-based event loop mechanism.
+//
+// kqueue is available on BSDs and Mac OS X, though there are subtle differences from OS to OS.
+//
+// kqueue supports watching file descriptors (input and output as separate watches only),
+// signals, child processes, and timers. Unfortunately support for the latter two is imperfect;
+// it is not possible to reserve process watches in advance; timers can only be active, count
+// down immediately when created, and cannot be reset to another time. For timers especially
+// the problems are significant: we can't allocate timers in advance, and we can't even feasibly
+// manage our own timer queue via a single kqueue-backed timer. Therefore, an alternate timer
+// mechanism must be used together with kqueue.
+
+namespace dasynq {
+
+template <class Base> class kqueue_loop;
+
+class macos_kqueue_traits : public signal_traits
+{
+    template <class Base> friend class macos_kqueue_loop;
+
+    public:
+
+    class fd_r;
+
+    // File descriptor optional storage. If the mechanism can return the file descriptor, this
+    // class will be empty, otherwise it can hold a file descriptor.
+    class fd_s {
+        public:
+        fd_s(int) { }
+
+        DASYNQ_EMPTY_BODY
+    };
+
+    // File descriptor reference (passed to event callback). If the mechanism can return the
+    // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
+    // must be stored in an fd_s instance.
+    class fd_r {
+        int fd;
+        public:
+        int getFd(fd_s ss)
+        {
+            return fd;
+        }
+        fd_r(int nfd) : fd(nfd)
+        {
+        }
+    };
+
+    constexpr static bool has_bidi_fd_watch = false;
+    constexpr static bool has_separate_rw_fd_watches = true;
+    constexpr static bool interrupt_after_fd_add = false;
+    constexpr static bool supports_non_oneshot_fd = false;
+};
+
+#if _POSIX_REALTIME_SIGNALS > 0
+static inline void prepare_signal(int signo) { }
+static inline void unprep_signal(int signo) { }
+
+inline bool get_siginfo(int signo, siginfo_t *siginfo)
+{
+    struct timespec timeout;
+    timeout.tv_sec = 0;
+    timeout.tv_nsec = 0;
+
+    sigset_t mask;
+    sigemptyset(&mask);
+    sigaddset(&mask, signo);
+    return (sigtimedwait(&mask, siginfo, &timeout) != -1);
+}
+#else
+
+// If we have no sigtimedwait implementation, we have to retrieve signal data by establishing a
+// signal handler.
+
+// We need to declare and define a non-static data variable, "siginfo_p", in this header, without
+// violating the "one definition rule". The only way to do that is via a template, even though we
+// don't otherwise need a template here:
+template <typename T = decltype(nullptr)> class sig_capture_templ
+{
+    public:
+    static siginfo_t * siginfo_p;
+
+    static void signalHandler(int signo, siginfo_t *siginfo, void *v)
+    {
+        *siginfo_p = *siginfo;
+    }
+};
+template <typename T> siginfo_t * sig_capture_templ<T>::siginfo_p = nullptr;
+
+using sig_capture = sig_capture_templ<>;
+
+inline void prepare_signal(int signo)
+{
+    struct sigaction the_action;
+    the_action.sa_sigaction = sig_capture::signalHandler;
+    the_action.sa_flags = SA_SIGINFO;
+    sigfillset(&the_action.sa_mask);
+
+    sigaction(signo, &the_action, nullptr);
+}
+
+inline void unprep_signal(int signo)
+{
+    signal(signo, SIG_DFL);
+}
+
+inline bool get_siginfo(int signo, siginfo_t *siginfo)
+{
+    sig_capture::siginfo_p = siginfo;
+
+    sigset_t mask;
+    sigfillset(&mask);
+    sigdelset(&mask, signo);
+    sigsuspend(&mask);
+    return true;
+}
+
+#endif
+
+template <class Base> class macos_kqueue_loop : public signal_events<Base,true>
+{
+    int kqfd; // kqueue fd
+
+    // Base contains:
+    //   lock - a lock that can be used to protect internal structure.
+    //          receive*() methods will be called with lock held.
+    //   receive_signal(sigdata_t &, user *) noexcept
+    //   receive_fd_event(fd_r, user *, int flags) noexcept
+
+    using fd_r = typename macos_kqueue_traits::fd_r;
+
+    // The flag to specify poll() semantics for regular file readiness: that is, we want
+    // ready-for-read to be returned even at end of file:
+#if defined(NOTE_FILE_POLL)
+    // FreeBSD:
+    constexpr static int POLL_SEMANTICS = NOTE_FILE_POLL;
+#else
+    // Note that macOS has an "EV_POLL" defined that looks like it should give poll semantics
+    // when passed as a flag. However, it is filtered at the syscall entry so we cannot use it in
+    // kqueue. (The kernel uses it internally to implement poll()).
+    constexpr static int POLL_SEMANTICS = 0;
+#endif
+
+    void process_events(struct kevent *events, int r)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+        for (int i = 0; i < r; i++) {
+            if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) {
+                int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS;
+                auto r = Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags);
+                if (std::get<0>(r) == 0) {
+                    // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for
+                    // another connection).
+                    events[i].flags = EV_DISABLE | EV_CLEAR;
+                }
+                else {
+                    events[i].flags = EV_ENABLE;
+                }
+            }
+            else {
+                events[i].flags = EV_DISABLE;
+            }
+        }
+
+        // Now we disable all received events, to simulate EV_DISPATCH:
+        kevent(kqfd, events, r, nullptr, 0, nullptr);
+    }
+
+    public:
+
+    /**
+     * kqueue_loop constructor.
+     *
+     * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
+     */
+    macos_kqueue_loop()
+    {
+        kqfd = kqueue();
+        if (kqfd == -1) {
+            throw std::system_error(errno, std::system_category());
+        }
+        Base::init(this);
+    }
+
+    ~macos_kqueue_loop()
+    {
+        close(kqfd);
+    }
+
+    void set_filter_enabled(short filterType, uintptr_t ident, void *udata, bool enable)
+    {
+        // Note, on OpenBSD enabling or disabling filter will not alter the filter parameters (udata etc);
+        // on OS X however, it will. Therefore we set udata here (to the same value as it was originally
+        // set) in order to work correctly on both kernels.
+        struct kevent kev;
+        int fflags = (filterType == EVFILT_READ) ? POLL_SEMANTICS : 0;
+        EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, fflags, 0, udata);
+        kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
+    }
+
+    void remove_filter(short filterType, uintptr_t ident)
+    {
+        struct kevent kev;
+        EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0);
+        kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
+    }
+
+    //        fd:  file descriptor to watch
+    //  userdata:  data to associate with descriptor
+    //     flags:  IN_EVENTS | OUT_EVENTS | ONE_SHOT
+    //             (only one of IN_EVENTS/OUT_EVENTS can be specified)
+    // soft_fail:  true if unsupported file descriptors should fail by returning false instead
+    //             of throwing an exception
+    // returns: true on success; false if file descriptor type isn't supported and emulate == true
+    // throws:  std::system_error or std::bad_alloc on failure
+    bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool emulate = false)
+    {
+        short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE;
+
+        if (filter == EVFILT_READ && POLL_SEMANTICS == 0 && emulate) {
+            // We can't request poll semantics, so check for regular file:
+            struct stat statbuf;
+            if (fstat(fd, &statbuf) == -1) {
+                throw new std::system_error(errno, std::system_category());
+            }
+            if ((statbuf.st_mode & S_IFMT) == S_IFREG) {
+                // Regular file: emulation required
+                return false;
+            }
+        }
+
+        int fflags = (filter == EVFILT_READ) ? POLL_SEMANTICS : 0;
+
+        struct kevent kev;
+        EV_SET(&kev, fd, filter, EV_ADD | (enabled ? 0 : EV_DISABLE), fflags, 0, userdata);
+        if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) {
+            // Note that kqueue supports EVFILT_READ on regular file fd's, but not EVFILT_WRITE.
+            if (filter == EVFILT_WRITE && errno == EINVAL && emulate) {
+                return false; // emulate
+            }
+            throw new std::system_error(errno, std::system_category());
+        }
+        return true;
+    }
+
+    // returns: 0 on success
+    //          IN_EVENTS  if in watch requires emulation
+    //          OUT_EVENTS if out watch requires emulation
+    int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false)
+    {
+#ifdef EV_RECEIPT
+        struct kevent kev[2];
+        struct kevent kev_r[2];
+        short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
+        short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
+        EV_SET(&kev[0], fd, EVFILT_READ, rflags, POLL_SEMANTICS, 0, userdata);
+        EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
+
+        int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr);
+
+        if (r == -1) {
+            throw new std::system_error(errno, std::system_category());
+        }
+
+        // Some possibilities:
+        // - both ends failed. We'll throw an error rather than allowing emulation.
+        // - read watch failed, write succeeded : should not happen.
+        // - read watch added, write failed: if emulate == true, succeed;
+        //                                   if emulate == false, remove read and fail.
+
+        if (kev_r[0].data != 0) {
+            // read failed
+            throw new std::system_error(kev_r[0].data, std::system_category());
+        }
+
+        if (kev_r[1].data != 0) {
+            if (emulate) {
+                // We can emulate, but, do we have correct semantics?
+                if (POLL_SEMANTICS != 0) {
+                    return OUT_EVENTS;
+                }
+
+                // if we can't get poll semantics, emulate for read as well:
+                // first remove read watch:
+                EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+                kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+                return IN_EVENTS | OUT_EVENTS;
+            }
+            // remove read watch
+            EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+            kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+            // throw exception
+            throw new std::system_error(kev_r[1].data, std::system_category());
+        }
+
+        return 0;
+#else
+        // OpenBSD doesn't have EV_RECEIPT: install the watches one at a time
+        struct kevent kev[1];
+
+        short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE);
+        short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE);
+        EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata);
+
+        int r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+
+        if (r == -1) {
+            throw new std::system_error(errno, std::system_category());
+        }
+
+        EV_SET(&kev[0], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
+
+        r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+
+        if (r == -1) {
+            if (emulate) {
+                return OUT_EVENTS;
+            }
+            // remove read watch
+            EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+            kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+            // throw exception
+            throw new std::system_error(errno, std::system_category());
+        }
+
+        return 0;
+#endif
+    }
+
+    // flags specifies which watch to remove; ignored if the loop doesn't support
+    // separate read/write watches.
+    void remove_fd_watch(int fd, int flags)
+    {
+        remove_filter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd);
+    }
+
+    void remove_fd_watch_nolock(int fd, int flags)
+    {
+        remove_fd_watch(fd, flags);
+    }
+
+    void remove_bidi_fd_watch(int fd) noexcept
+    {
+        struct kevent kev[2];
+        EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
+        EV_SET(&kev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
+
+        kevent(kqfd, kev, 2, nullptr, 0, nullptr);
+    }
+
+    void enable_fd_watch(int fd, void *userdata, int flags)
+    {
+        set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, userdata, true);
+    }
+
+    void enable_fd_watch_nolock(int fd, void *userdata, int flags)
+    {
+        enable_fd_watch(fd, userdata, flags);
+    }
+
+    void disable_fd_watch(int fd, int flags)
+    {
+        set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, nullptr, false);
+    }
+
+    void disable_fd_watch_nolock(int fd, int flags)
+    {
+        disable_fd_watch(fd, flags);
+    }
+
+    // If events are pending, process an unspecified number of them.
+    // If no events are pending, wait until one event is received and
+    // process this event (and possibly any other events received
+    // simultaneously).
+    // If processing an event removes a watch, there is a possibility
+    // that the watched event will still be reported (if it has
+    // occurred) before pull_events() returns.
+    //
+    //  do_wait - if false, returns immediately if no events are
+    //            pending.
+    void pull_events(bool do_wait)
+    {
+        struct kevent events[16];
+        struct timespec ts;
+
+        Base::lock.lock();
+        sigset_t sigmask = this->get_active_sigmask();
+        Base::lock.unlock();
+
+        volatile bool was_signalled = false;
+
+        // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
+        // received during polling, it will longjmp back to here:
+        if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
+            this->process_signal(sigmask);
+            was_signalled = true;
+        }
+
+        if (was_signalled) {
+            do_wait = false;
+        }
+
+        ts.tv_sec = 0;
+        ts.tv_nsec = 0;
+
+        std::atomic_signal_fence(std::memory_order_release);
+        this->sigmaskf(SIG_UNBLOCK, &sigmask, nullptr);
+        int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
+        this->sigmaskf(SIG_BLOCK, &sigmask, nullptr);
+        if (r == -1 || r == 0) {
+            // signal or no events
+            return;
+        }
+
+        do {
+            process_events(events, r);
+            r = kevent(kqfd, nullptr, 0, events, 16, &ts);
+        } while (r > 0);
+    }
+};
+
+} // end namespace
index a10e81a7f8d0a653043eebebe6f80144b5bd5c5e..29a1ae35b627ae17ea6af2734beac1f475825e9a 100644 (file)
@@ -13,6 +13,7 @@
 #include <setjmp.h>
 
 #include "dasynq-config.h"
+#include "dasynq-signal.h"
 
 // "pselect"-based event loop mechanism.
 //
@@ -21,40 +22,10 @@ namespace dasynq {
 
 template <class Base> class select_events;
 
-class select_traits
+class select_traits : public signal_traits
 {
     public:
 
-    class sigdata_t
-    {
-        template <class Base> friend class select_events;
-
-        siginfo_t info;
-
-        public:
-        // mandatory:
-        int get_signo() { return info.si_signo; }
-        int get_sicode() { return info.si_code; }
-        pid_t get_sipid() { return info.si_pid; }
-        uid_t get_siuid() { return info.si_uid; }
-        void * get_siaddr() { return info.si_addr; }
-        int get_sistatus() { return info.si_status; }
-        int get_sival_int() { return info.si_value.sival_int; }
-        void * get_sival_ptr() { return info.si_value.sival_ptr; }
-
-        // XSI
-        int get_sierrno() { return info.si_errno; }
-
-        // XSR (streams) OB (obselete)
-#if !defined(__OpenBSD__)
-        // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS
-        // interface.
-        int get_siband() { return info.si_band; }
-#endif
-
-        void set_signo(int signo) { info.si_signo = signo; }
-    };
-
     class fd_r;
 
     // File descriptor optional storage. If the mechanism can return the file descriptor, this
@@ -89,66 +60,13 @@ class select_traits
     constexpr static bool supports_non_oneshot_fd = false;
 };
 
-namespace dprivate {
-namespace select_mech {
-
-// We need to declare and define a non-static data variable, "siginfo_p", in this header, without
-// violating the "one definition rule". The only way to do that is via a template, even though we
-// don't otherwise need a template here:
-template <typename T = decltype(nullptr)> class sig_capture_templ
-{
-    public:
-    static siginfo_t siginfo_cap;
-    static sigjmp_buf rjmpbuf;
-
-    static void signal_handler(int signo, siginfo_t *siginfo, void *v)
-    {
-        siginfo_cap = *siginfo;
-        siglongjmp(rjmpbuf, 1);
-    }
-};
-template <typename T> siginfo_t sig_capture_templ<T>::siginfo_cap;
-template <typename T> sigjmp_buf sig_capture_templ<T>::rjmpbuf;
-
-using sig_capture = sig_capture_templ<>;
-
-inline void prepare_signal(int signo)
-{
-    struct sigaction the_action;
-    the_action.sa_sigaction = sig_capture::signal_handler;
-    the_action.sa_flags = SA_SIGINFO;
-    sigfillset(&the_action.sa_mask);
-
-    sigaction(signo, &the_action, nullptr);
-}
-
-inline sigjmp_buf &get_sigreceive_jmpbuf()
-{
-    return sig_capture::rjmpbuf;
-}
-
-inline void unprep_signal(int signo)
-{
-    signal(signo, SIG_DFL);
-}
-
-inline siginfo_t * get_siginfo()
-{
-    return &sig_capture::siginfo_cap;
-}
-
-} } // namespace dasynq :: select_mech
-
-template <class Base> class select_events : public Base
+template <class Base> class select_events : public signal_events<Base>
 {
     fd_set read_set;
     fd_set write_set;
     //fd_set error_set;  // logical OR of both the above
     int max_fd = 0; // highest fd in any of the sets
 
-    sigset_t active_sigmask; // mask out unwatched signals i.e. active=0
-    void * sig_userdata[NSIG];
-
     // userdata pointers in read and write respectively, for each fd:
     std::vector<void *> rd_udata;
     std::vector<void *> wr_udata;
@@ -159,7 +77,6 @@ template <class Base> class select_events : public Base
     //   receive_signal(sigdata_t &, user *) noexcept
     //   receive_fd_event(fd_r, user *, int flags) noexcept
 
-    using sigdata_t = select_traits::sigdata_t;
     using fd_r = typename select_traits::fd_r;
 
     void process_events(fd_set *read_set_p, fd_set *write_set_p, fd_set *error_set_p)
@@ -204,7 +121,6 @@ template <class Base> class select_events : public Base
     {
         FD_ZERO(&read_set);
         FD_ZERO(&write_set);
-        sigfillset(&active_sigmask);
         Base::init(this);
     }
 
@@ -333,44 +249,6 @@ template <class Base> class select_events : public Base
         disable_fd_watch_nolock(fd, flags);
     }
 
-    // Note signal should be masked before call.
-    void add_signal_watch(int signo, void *userdata)
-    {
-        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
-        add_signal_watch_nolock(signo, userdata);
-    }
-
-    // Note signal should be masked before call.
-    void add_signal_watch_nolock(int signo, void *userdata)
-    {
-        sig_userdata[signo] = userdata;
-        sigdelset(&active_sigmask, signo);
-        dprivate::select_mech::prepare_signal(signo);
-    }
-
-    // Note, called with lock held:
-    void rearm_signal_watch_nolock(int signo, void *userdata) noexcept
-    {
-        sig_userdata[signo] = userdata;
-        sigdelset(&active_sigmask, signo);
-    }
-
-    void remove_signal_watch_nolock(int signo) noexcept
-    {
-        dprivate::select_mech::unprep_signal(signo);
-        sigaddset(&active_sigmask, signo);
-        sig_userdata[signo] = nullptr;
-        // No need to signal other threads
-    }
-
-    void remove_signal_watch(int signo) noexcept
-    {
-        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
-        remove_signal_watch_nolock(signo);
-    }
-
-    public:
-
     // If events are pending, process an unspecified number of them.
     // If no events are pending, wait until one event is received and
     // process this event (and possibly any other events received
@@ -383,7 +261,7 @@ template <class Base> class select_events : public Base
     //            pending.
     void pull_events(bool do_wait) noexcept
     {
-        using namespace dprivate::select_mech;
+        //using namespace dprivate::select_mech;
 
         struct timespec ts;
         ts.tv_sec = 0;
@@ -398,6 +276,8 @@ template <class Base> class select_events : public Base
         write_set_c = write_set;
         err_set = read_set;
 
+        const sigset_t &active_sigmask = this->get_active_sigmask();
+
         sigset_t sigmask;
         this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
         // This is horrible, but hopefully will be optimised well. POSIX gives no way to combine signal
@@ -414,18 +294,8 @@ template <class Base> class select_events : public Base
 
         // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
         // received during polling, it will longjmp back to here:
-        if (sigsetjmp(get_sigreceive_jmpbuf(), 1) != 0) {
-            std::atomic_signal_fence(std::memory_order_acquire);
-            auto * sinfo = get_siginfo();
-            sigdata_t sigdata;
-            sigdata.info = *sinfo;
-            Base::lock.lock();
-            void *udata = sig_userdata[sinfo->si_signo];
-            if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) {
-                sigaddset(&sigmask, sinfo->si_signo);
-                sigaddset(&active_sigmask, sinfo->si_signo);
-            }
-            Base::lock.unlock();
+        if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
+            this->process_signal(sigmask);
             was_signalled = true;
         }
 
diff --git a/src/dasynq/dasynq-signal.h b/src/dasynq/dasynq-signal.h
new file mode 100644 (file)
index 0000000..dc9a840
--- /dev/null
@@ -0,0 +1,215 @@
+#include <atomic>
+
+#include <signal.h>
+#include <setjmp.h>
+
+// Support for the standard POSIX signal mechanisms. This can be used by backends that don't
+// otherwise support receiving signals. It is not particularly nice (it involves using longjmp
+// out of a signal handler, which POSIX mildly frowns upon) but it's really the only viable way
+// to process signals together with file descriptor / other events and obtain the full siginfo_t
+// data passed to the signal handler.
+
+namespace dasynq {
+
+class signal_traits
+{
+    public:
+
+    class sigdata_t
+    {
+        template <typename, bool> friend class signal_events;
+
+        siginfo_t info;
+
+        public:
+        // mandatory:
+        int get_signo() { return info.si_signo; }
+        int get_sicode() { return info.si_code; }
+        pid_t get_sipid() { return info.si_pid; }
+        uid_t get_siuid() { return info.si_uid; }
+        void * get_siaddr() { return info.si_addr; }
+        int get_sistatus() { return info.si_status; }
+        int get_sival_int() { return info.si_value.sival_int; }
+        void * get_sival_ptr() { return info.si_value.sival_ptr; }
+
+        // XSI
+        int get_sierrno() { return info.si_errno; }
+
+        // XSR (streams) OB (obselete)
+#if !defined(__OpenBSD__)
+        // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS
+        // interface.
+        int get_siband() { return info.si_band; }
+#endif
+
+        void set_signo(int signo) { info.si_signo = signo; }
+    };
+
+    constexpr static bool interrupt_after_signal_add = true;
+};
+
+namespace dprivate {
+namespace signal_mech {
+
+// We need to declare and define a non-static data variable, "siginfo_p", in this header, without
+// violating the "one definition rule". The only way to do that is via a template, even though we
+// don't otherwise need a template here:
+template <typename T = decltype(nullptr)> class sig_capture_templ
+{
+    public:
+    static siginfo_t siginfo_cap;
+    static sigjmp_buf rjmpbuf;
+
+    static void signal_handler(int signo, siginfo_t *siginfo, void *v)
+    {
+        siginfo_cap = *siginfo;
+        siglongjmp(rjmpbuf, 1);
+    }
+};
+template <typename T> siginfo_t sig_capture_templ<T>::siginfo_cap;
+template <typename T> sigjmp_buf sig_capture_templ<T>::rjmpbuf;
+
+using sig_capture = sig_capture_templ<>;
+
+inline void prepare_signal(int signo)
+{
+    struct sigaction the_action;
+    the_action.sa_sigaction = sig_capture::signal_handler;
+    the_action.sa_flags = SA_SIGINFO;
+    sigfillset(&the_action.sa_mask);
+
+    sigaction(signo, &the_action, nullptr);
+}
+
+inline sigjmp_buf &get_sigreceive_jmpbuf()
+{
+    return sig_capture::rjmpbuf;
+}
+
+inline void unprep_signal(int signo)
+{
+    signal(signo, SIG_DFL);
+}
+
+inline siginfo_t * get_siginfo()
+{
+    return &sig_capture::siginfo_cap;
+}
+
+} } // namespace dasynq :: signal_mech
+
+// signal_events template.
+//
+// Active (watched and enabled) signals are maintained as a signal mask, which either has active
+// signals in the mask or inactive signals in the mask, depending on the mask_enables parameter.
+// (if mask_enables is true, active signals are in the mask). Which is more convenient depends
+// exactly on how the mask will be used.
+//
+template <class Base, bool mask_enables = false> class signal_events : public Base
+{
+    sigset_t active_sigmask; // mask out unwatched signals i.e. active=0
+    void * sig_userdata[NSIG];
+
+    using sigdata_t = signal_traits::sigdata_t;
+
+    protected:
+
+    signal_events()
+    {
+        if (mask_enables) {
+            sigemptyset(&active_sigmask);
+        }
+        else {
+            sigfillset(&active_sigmask);
+        }
+    }
+
+    const sigset_t &get_active_sigmask()
+    {
+        return active_sigmask;
+    }
+
+    sigjmp_buf &get_sigreceive_jmpbuf()
+    {
+        return dprivate::signal_mech::get_sigreceive_jmpbuf();
+    }
+
+    void process_signal(sigset_t &sigmask)
+    {
+        using namespace dprivate::signal_mech;
+        std::atomic_signal_fence(std::memory_order_acquire);
+        auto * sinfo = get_siginfo();
+        sigdata_t sigdata;
+        sigdata.info = *sinfo;
+
+        Base::lock.lock();
+        void *udata = sig_userdata[sinfo->si_signo];
+        if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) {
+            if (mask_enables) {
+                sigdelset(&sigmask, sinfo->si_signo);
+                sigdelset(&active_sigmask, sinfo->si_signo);
+            }
+            else {
+                sigaddset(&sigmask, sinfo->si_signo);
+                sigaddset(&active_sigmask, sinfo->si_signo);
+            }
+        }
+        Base::lock.unlock();
+    }
+
+    public:
+
+    // Note signal should be masked before call.
+    void add_signal_watch(int signo, void *userdata)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        add_signal_watch_nolock(signo, userdata);
+    }
+
+    // Note signal should be masked before call.
+    void add_signal_watch_nolock(int signo, void *userdata)
+    {
+        sig_userdata[signo] = userdata;
+        if (mask_enables) {
+            sigaddset(&active_sigmask, signo);
+        }
+        else {
+            sigdelset(&active_sigmask, signo);
+        }
+        dprivate::signal_mech::prepare_signal(signo);
+    }
+
+    // Note, called with lock held:
+    void rearm_signal_watch_nolock(int signo, void *userdata) noexcept
+    {
+        sig_userdata[signo] = userdata;
+        if (mask_enables) {
+            sigaddset(&active_sigmask, signo);
+        }
+        else {
+            sigdelset(&active_sigmask, signo);
+        }
+    }
+
+    void remove_signal_watch_nolock(int signo) noexcept
+    {
+        dprivate::signal_mech::unprep_signal(signo);
+        if (mask_enables) {
+            sigdelset(&active_sigmask, signo);
+        }
+        else {
+            sigaddset(&active_sigmask, signo);
+        }
+        sig_userdata[signo] = nullptr;
+        // No need to signal other threads
+    }
+
+    void remove_signal_watch(int signo) noexcept
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        remove_signal_watch_nolock(signo);
+    }
+
+};
+
+}
index 68175c4daa88e045cbe86f26bf4c31f5876db45f..b8317406f0246927d2d308d1df07534d7d66b17d 100644 (file)
@@ -8,21 +8,24 @@
 #include "dasynq-interrupt.h"
 #include "dasynq-util.h"
 
-// Dasynq uses a "mix-in" pattern to produce an event loop implementation incorporating selectable implementations of
-// various components (main backend, timers, child process watch mechanism etc). In C++ this can be achieved by
-// a template for some component which extends its own type parameter:
+// Dasynq uses a "mix-in" pattern to produce an event loop implementation incorporating selectable
+// implementations of various components (main backend, timers, child process watch mechanism etc). In C++
+// this can be achieved by a template for some component which extends its own type parameter:
 //
 //     template <typename Base> class X : public B { .... }
 //
-// We can chain several such components together (and so so below) to "mix in" the functionality of each into the final
-// class, eg:
+// (Note that in a sense this is actually the opposite of the so-called "Curiously Recurring Template"
+// pattern, which can be used to achieve a similar goal). We can chain several such components together to
+// "mix in" the functionality of each into the final class, eg:
 //
-//     template <typename T> using Loop = epoll_loop<interrupt_channel<timer_fd_events<child_proc_events<T>>>>;
+//     template <typename T> using loop_t =
+//         epoll_loop<interrupt_channel<timer_fd_events<child_proc_events<T>>>>;
 //
-// (which defines an alias template "Loop", whose implementation will use the epoll backend, a standard interrupt channel
-// implementation, a timerfd-based timer implementation, and the standard child process watch implementation).
-// We sometimes need the base class to be able to call derived-class members: to do this we pass a reference to
-// the derived instance into a templated method in the base, called "init":
+// (which defines an alias template "loop_t", whose implementation will use the epoll backend, a standard
+// interrupt channel implementation, a timerfd-based timer implementation, and the standard child process
+// watch implementation). We sometimes need the base class to be able to call derived-class members: to do
+// this we pass a reference to the derived instance into a template member function in the base, for example
+// the "init" function:
 //
 //     template <typename T> void init(T *derived)
 //     {
 //         Base::init(derived);
 //     }
 //
-// At the base all this is the event_dispatch class, defined below, which receives event notifications and inserts
-// them into a queue for processing. The event_loop class, also below, wraps this (via composition) in an interface
-// which can be used to register/de-regsiter/enable/disable event watchers, and which can process the queued events
-// by calling the watcher callbacks. The event_loop class also provides some synchronisation to ensure thread-safety.
-
-#if DASYNQ_HAVE_KQUEUE
-#include "dasynq-kqueue.h"
+// The 'loop_t' defined above is a template for a usable backend mechanism for the event_loop template
+// class. At the base all this is the event_dispatch class, defined below, which receives event
+// notifications and inserts them into a queue for processing. The event_loop class, also below, wraps this
+// (via composition) in an interface which can be used to register/de-register/enable/disable event
+// watchers, and which can process the queued events by calling the watcher callbacks. The event_loop class
+// also provides some synchronisation to ensure thread-safety, and abstracts away some differences between
+// backends.
+//
+// The differences are exposed as traits, partly via a separate traits class (loop_traits_t as defined
+// below, which contains the "main" traits, particularly the sigdata_t, fd_r and fd_s types). Note that the
+// event_dispatch class exposes the loop traits as traits_t, and these are then potentially augmented at
+// each stage of the mechanism inheritance chain (i.e. the final traits are exposed as
+// `loop_t<event_dispatch>::traits_t'.
+//
+// The trait members are:
+//   sigdata_t  - a wrapper for the siginfo_t type or equivalent used to pass signal parameters
+//   fd_r       - a file descriptor wrapper, if the backend is able to retrieve the file descriptor when
+//                it receives an fd event. Not all backends can do this.
+//   fd_s       - a file descriptor storage wrapper. If the backend can retrieve file descriptors, this
+//                will be empty (and ideally zero-size), otherwise it stores a file descriptor.
+//                With an fd_r and fd_s instance you can always retrieve the file descriptor:
+//                `fdr.get_fd(fds)' will return it.
+//   has_bidi_fd_watch
+//              - boolean indicating whether a single watch can support watching for both input and output
+//                events simultaneously
+//   has_separate_rw_fd_watches
+//              - boolean indicating whether it is possible to add separate input and output watches for the
+//                same fd. Either this or has_bidi_fd_watch must be true.
+//   interrupt_after_fd_add
+//              - boolean indicating if a loop interrupt must be forced after adding/enabling an fd watch.
+//   interrupt_after_signal_add
+//              - boolean indicating if a loop interrupt must be forced after adding or enabling a signal
+//                watch.
+//   supports_non_oneshot_fd
+//              - boolean; if true, event_dispatch can arm an fd watch without ONESHOT and returning zero
+//                events from receive_fd_event (the event notification function) will leave the descriptor
+//                armed. If false, all fd watches are effectively ONESHOT (they can be re-armed immediately
+//                after delivery by returning an appropriate event flag mask).
+//   full_timer_support
+//              - boolean indicating that the monotonic and system clocks are actually different clocks and
+//                that timers against the system clock will work correctly if the system clock time is
+//                adjusted. If false, the monotic clock may not be present at all (monotonic clock will map
+//                to system clock), and timers against either clock are not guaranteed to work correctly if
+//                the system clock is adjusted.
+
+#if DASYNQ_HAVE_EPOLL <= 0
 #if _POSIX_TIMERS > 0
 #include "dasynq-posixtimer.h"
 namespace dasynq {
@@ -50,11 +92,24 @@ namespace dasynq {
     template <typename T> using timer_events = itimer_events<T>;
 }
 #endif
+#endif
+
+#if DASYNQ_HAVE_KQUEUE
+#if DASYNQ_KQUEUE_MACOS_WORKAROUND
+#include "dasynq-kqueue-macos.h"
+#include "dasynq-childproc.h"
+namespace dasynq {
+    template <typename T> using loop_t = macos_kqueue_loop<interrupt_channel<timer_events<child_proc_events<T>>>>;
+    using loop_traits_t = macos_kqueue_traits;
+}
+#else
+#include "dasynq-kqueue.h"
 #include "dasynq-childproc.h"
 namespace dasynq {
     template <typename T> using loop_t = kqueue_loop<interrupt_channel<timer_events<child_proc_events<T>>>>;
     using loop_traits_t = kqueue_traits;
 }
+#endif
 #elif DASYNQ_HAVE_EPOLL
 #include "dasynq-epoll.h"
 #include "dasynq-timerfd.h"
@@ -143,6 +198,7 @@ namespace dprivate {
         typedef std::condition_variable_any condvar;
     };
 
+    // For a single-threaded loop, the waitqueue is a no-op:
     template <> class waitqueue_node<null_mutex>
     {
         // Specialised waitqueue_node for null_mutex.
@@ -248,21 +304,72 @@ namespace dprivate {
         }
     };
     
+    // friend of event_loop for giving access to various private members
+    class loop_access {
+        public:
+        template <typename Loop>
+        static typename Loop::mutex_t &get_base_lock(Loop &loop) noexcept
+        {
+            return loop.get_base_lock();
+        }
+
+        template <typename Loop>
+        static rearm process_fd_rearm(Loop &loop, typename Loop::base_fd_watcher *bfw,
+                rearm rearm_type, bool is_multi_watch) noexcept
+        {
+            return loop.process_fd_rearm(bfw, rearm_type, is_multi_watch);
+        }
+
+        template <typename Loop>
+        static rearm process_secondary_rearm(Loop &loop, typename Loop::base_bidi_fd_watcher * bdfw,
+                base_watcher * outw, rearm rearm_type) noexcept
+        {
+            return loop.process_secondary_rearm(bdfw, outw, rearm_type);
+        }
+
+        template <typename Loop>
+        static void process_signal_rearm(Loop &loop, typename Loop::base_signal_watcher * bsw,
+                rearm rearm_type) noexcept
+        {
+            loop.process_signal_rearm(bsw, rearm_type);
+        }
+
+        template <typename Loop>
+        static void process_child_watch_rearm(Loop &loop, typename Loop::base_child_watcher *bcw,
+                rearm rearm_type) noexcept
+        {
+            loop.process_child_watch_rearm(bcw, rearm_type);
+        }
+
+        template <typename Loop>
+        static void process_timer_rearm(Loop &loop, typename Loop::base_timer_watcher *btw,
+                rearm rearm_type) noexcept
+        {
+            loop.process_timer_rearm(btw, rearm_type);
+        }
+
+        template <typename Loop>
+        static void requeue_watcher(Loop &loop, base_watcher *watcher) noexcept
+        {
+            loop.requeue_watcher(watcher);
+        }
+    };
+
     // Do standard post-dispatch processing for a watcher. This handles the case of removing or
-    // re-queing watchers depending on the rearm type.
+    // re-queueing watchers depending on the rearm type.
     template <typename Loop> void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearm_type)
     {
         if (rearm_type == rearm::REMOVE) {
-            loop.get_base_lock().unlock();
+            loop_access::get_base_lock(loop).unlock();
             watcher->watch_removed();
-            loop.get_base_lock().lock();
+            loop_access::get_base_lock(loop).lock();
         }
         else if (rearm_type == rearm::REQUEUE) {
-            loop.requeue_watcher(watcher);
+            loop_access::requeue_watcher(loop, watcher);
         }
     }
 
-    // This class serves as the base class (mixin) for the AEN mechanism class.
+    // This class serves as the base class (mixin) for the backend mechanism.
     //
     // The event_dispatch class maintains the queued event data structures. It inserts watchers
     // into the queue when events are received (receiveXXX methods). It also owns a mutex used
@@ -271,12 +378,12 @@ namespace dprivate {
     // In general the methods should be called with lock held. In practice this means that the
     // event loop backend implementations must obtain the lock; they are also free to use it to
     // protect their own internal data structures.
-    template <typename T_Mutex, typename Traits, typename LoopTraits> class event_dispatch
+    template <typename Traits, typename LoopTraits> class event_dispatch
     {
-        friend class dasynq::event_loop<T_Mutex, LoopTraits>;;
+        friend class dasynq::event_loop<typename LoopTraits::mutex_t, LoopTraits>;;
 
         public:
-        using mutex_t = T_Mutex;
+        using mutex_t = typename LoopTraits::mutex_t;
         using traits_t = Traits;
 
         private:
@@ -284,11 +391,9 @@ namespace dprivate {
         // queue data structure/pointer
         prio_queue event_queue;
         
-        using base_signal_watcher = dasynq::dprivate::base_signal_watcher<mutex_t, typename traits_t::sigdata_t>;
-        using base_fd_watcher = dasynq::dprivate::base_fd_watcher<mutex_t>;
-        using base_bidi_fd_watcher = dasynq::dprivate::base_bidi_fd_watcher<mutex_t>;
-        using base_child_watcher = dasynq::dprivate::base_child_watcher<mutex_t>;
-        using base_timer_watcher = dasynq::dprivate::base_timer_watcher<mutex_t>;
+        using base_signal_watcher = dprivate::base_signal_watcher<typename traits_t::sigdata_t>;
+        using base_child_watcher = dprivate::base_child_watcher;
+        using base_timer_watcher = dprivate::base_timer_watcher;
         
         // Add a watcher into the queueing system (but don't queue it). Call with lock held.
         //   may throw: std::bad_alloc
@@ -439,7 +544,7 @@ namespace dprivate {
             }
         }
         
-        // Queue a watcher for reomval, or issue "removed" callback to it.
+        // Queue a watcher for removal, or issue "removed" callback to it.
         // Call with lock free.
         void issue_delete(base_bidi_fd_watcher *watcher) noexcept
         {
@@ -478,34 +583,39 @@ namespace dprivate {
         event_dispatch() {  }
         event_dispatch(const event_dispatch &) = delete;
     };
+
 }
 
-// This is the main event_loop implementation. It serves as an interface to the event loop
-// backend (of which it maintains an internal instance). It also serialises waits the backend
-// and provides safe deletion of watchers (see comments inline).
+// This is the main event_loop implementation. It serves as an interface to the event loop backend (of which
+// it maintains an internal instance). It also serialises polling the backend and provides safe deletion of
+// watchers (see comments inline).
+//
+// The T_Mutex type parameter specifies the mutex type. A null_mutex can be used for a single-threaded event
+// loop; std::mutex, or any mutex providing a compatible interface, can be used for a thread-safe event
+// loop.
+//
+// The Traits type parameter specifies any required traits for the event loop.  This specifies the back-end
+// to use (backend_t, a template) and the basic back-end traits (backend_traits_t).
+// The default is `default_traits<T_Mutex>'.
+//
 template <typename T_Mutex, typename Traits>
 class event_loop
 {
     using my_event_loop_t = event_loop<T_Mutex, Traits>;
+
     friend class dprivate::fd_watcher<my_event_loop_t>;
     friend class dprivate::bidi_fd_watcher<my_event_loop_t>;
     friend class dprivate::signal_watcher<my_event_loop_t>;
     friend class dprivate::child_proc_watcher<my_event_loop_t>;
     friend class dprivate::timer<my_event_loop_t>;
     
-    friend void dprivate::post_dispatch<my_event_loop_t>(my_event_loop_t &loop,
-            dprivate::base_watcher *watcher, rearm rearm_type);
-
-    template <typename, typename> friend class dprivate::fd_watcher_impl;
-    template <typename, typename> friend class dprivate::bidi_fd_watcher_impl;
-    template <typename, typename> friend class dprivate::signal_watcher_impl;
-    template <typename, typename> friend class dprivate::child_proc_watcher_impl;
-    template <typename, typename> friend class dprivate::timer_impl;
+    friend class dprivate::loop_access;
 
     using backend_traits_t = typename Traits::backend_traits_t;
 
-    template <typename T, typename U> using event_dispatch = dprivate::event_dispatch<T,U,Traits>;
-    using loop_mech_t = typename Traits::template backend_t<event_dispatch<T_Mutex, backend_traits_t>>;
+    template <typename T> using event_dispatch = dprivate::event_dispatch<T,Traits>;
+    using dispatch_t = event_dispatch<backend_traits_t>;
+    using loop_mech_t = typename Traits::template backend_t<dispatch_t>;
     using reaper_mutex_t = typename loop_mech_t::reaper_mutex_t;
 
     public:
@@ -517,11 +627,11 @@ class event_loop
     template <typename T> using waitqueue = dprivate::waitqueue<T>;
     template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
     using base_watcher = dprivate::base_watcher;
-    using base_signal_watcher = dprivate::base_signal_watcher<T_Mutex, typename loop_traits_t::sigdata_t>;
-    using base_fd_watcher = dprivate::base_fd_watcher<T_Mutex>;
-    using base_bidi_fd_watcher = dprivate::base_bidi_fd_watcher<T_Mutex>;
-    using base_child_watcher = dprivate::base_child_watcher<T_Mutex>;
-    using base_timer_watcher = dprivate::base_timer_watcher<T_Mutex>;
+    using base_signal_watcher = dprivate::base_signal_watcher<typename loop_traits_t::sigdata_t>;
+    using base_fd_watcher = dprivate::base_fd_watcher;
+    using base_bidi_fd_watcher = dprivate::base_bidi_fd_watcher;
+    using base_child_watcher = dprivate::base_child_watcher;
+    using base_timer_watcher = dprivate::base_timer_watcher;
     using watch_type_t = dprivate::watch_type_t;
 
     loop_mech_t loop_mech;
@@ -585,8 +695,7 @@ class event_loop
 
     void register_signal(base_signal_watcher *callBack, int signo)
     {
-        auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        std::lock_guard<mutex_t> guard(ed.lock);
+        std::lock_guard<mutex_t> guard(loop_mech.lock);
 
         loop_mech.prepare_watcher(callBack);
         try {
@@ -608,16 +717,14 @@ class event_loop
         waitqueue_node<T_Mutex> qnode;
         get_attn_lock(qnode);
         
-        auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        ed.issue_delete(callBack);
+        loop_mech.issue_delete(callBack);
         
         release_lock(qnode);
     }
 
     void register_fd(base_fd_watcher *callback, int fd, int eventmask, bool enabled, bool emulate = false)
     {
-        auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        std::lock_guard<mutex_t> guard(ed.lock);
+        std::lock_guard<mutex_t> guard(loop_mech.lock);
 
         loop_mech.prepare_watcher(callback);
 
@@ -642,10 +749,11 @@ class event_loop
         }
     }
     
+    // Register a bidi fd watcher. The watch_flags should already be set to the eventmask to watch
+    // (i.e. eventmask == callback->watch_flags is a pre-condition).
     void register_fd(base_bidi_fd_watcher *callback, int fd, int eventmask, bool emulate = false)
     {
-        auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        std::lock_guard<mutex_t> guard(ed.lock);
+        std::lock_guard<mutex_t> guard(loop_mech.lock);
 
         loop_mech.prepare_watcher(callback);
         try {
@@ -657,6 +765,7 @@ class event_loop
                     if (r & IN_EVENTS) {
                         callback->emulatefd = true;
                         if (eventmask & IN_EVENTS) {
+                            callback->watch_flags &= ~IN_EVENTS;
                             requeue_watcher(callback);
                         }
                     }
@@ -667,6 +776,7 @@ class event_loop
                     if (r & OUT_EVENTS) {
                         callback->out_watcher.emulatefd = true;
                         if (eventmask & OUT_EVENTS) {
+                            callback->watch_flags &= ~OUT_EVENTS;
                             requeue_watcher(&callback->out_watcher);
                         }
                     }
@@ -679,9 +789,11 @@ class event_loop
                         callback->emulatefd = true;
                         callback->out_watcher.emulatefd = true;
                         if (eventmask & IN_EVENTS) {
+                            callback->watch_flags &= ~IN_EVENTS;
                             requeue_watcher(callback);
                         }
                         if (eventmask & OUT_EVENTS) {
+                            callback->watch_flags &= ~OUT_EVENTS;
                             requeue_watcher(&callback->out_watcher);
                         }
                     }
@@ -734,7 +846,7 @@ class event_loop
     void deregister(base_fd_watcher *callback, int fd) noexcept
     {
         if (callback->emulatefd) {
-            auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
+            auto & ed = (dispatch_t &) loop_mech;
             ed.issue_delete(callback);
             return;
         }
@@ -744,7 +856,7 @@ class event_loop
         waitqueue_node<T_Mutex> qnode;
         get_attn_lock(qnode);
         
-        auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
+        auto & ed = (dispatch_t &) loop_mech;
         ed.issue_delete(callback);
         
         release_lock(qnode);        
@@ -762,7 +874,7 @@ class event_loop
         waitqueue_node<T_Mutex> qnode;
         get_attn_lock(qnode);
         
-        event_dispatch<T_Mutex, backend_traits_t> & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
+        dispatch_t & ed = (dispatch_t &) loop_mech;
         ed.issue_delete(callback);
         
         release_lock(qnode);
@@ -770,8 +882,7 @@ class event_loop
     
     void reserve_child_watch(base_child_watcher *callback)
     {
-        auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        std::lock_guard<mutex_t> guard(ed.lock);
+        std::lock_guard<mutex_t> guard(loop_mech.lock);
 
         loop_mech.prepare_watcher(callback);
         try {
@@ -785,8 +896,7 @@ class event_loop
     
     void unreserve(base_child_watcher *callback) noexcept
     {
-        auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        std::lock_guard<mutex_t> guard(ed.lock);
+        std::lock_guard<mutex_t> guard(loop_mech.lock);
 
         loop_mech.unreserve_child_watch(callback->watch_handle);
         loop_mech.release_watcher(callback);
@@ -794,8 +904,7 @@ class event_loop
     
     void register_child(base_child_watcher *callback, pid_t child)
     {
-        auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        std::lock_guard<mutex_t> guard(ed.lock);
+        std::lock_guard<mutex_t> guard(loop_mech.lock);
         
         loop_mech.prepare_watcher(callback);
         try {
@@ -824,8 +933,7 @@ class event_loop
         waitqueue_node<T_Mutex> qnode;
         get_attn_lock(qnode);
         
-        event_dispatch<T_Mutex, backend_traits_t> & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        ed.issue_delete(callback);
+        loop_mech.issue_delete(callback);
         
         release_lock(qnode);
     }
@@ -839,8 +947,7 @@ class event_loop
 
     void register_timer(base_timer_watcher *callback, clock_type clock)
     {
-        auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        std::lock_guard<mutex_t> guard(ed.lock);
+        std::lock_guard<mutex_t> guard(loop_mech.lock);
     
         loop_mech.prepare_watcher(callback);
         try {
@@ -897,8 +1004,7 @@ class event_loop
         waitqueue_node<T_Mutex> qnode;
         get_attn_lock(qnode);
         
-        event_dispatch<T_Mutex, backend_traits_t> & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        ed.issue_delete(callback);
+        loop_mech.issue_delete(callback);
         
         release_lock(qnode);
     }
@@ -911,25 +1017,18 @@ class event_loop
     void requeue_watcher(base_watcher *watcher) noexcept
     {
         loop_mech.queue_watcher(watcher);
-
-        // We need to signal any thread that is currently waiting on the loop mechanism, so that it wakes
-        // and processes the newly queued watcher:
-
-        wait_lock.lock();
-        bool attn_q_empty = attn_waitqueue.is_empty();
-        wait_lock.unlock();
-
-        if (! attn_q_empty) {
-            loop_mech.interrupt_wait();
-        }
+        interrupt_if_necessary();
     }
 
     // Interrupt the current poll-waiter, if necessary - that is, if the loop is multi-thread safe, and if
     // there is currently another thread polling the backend event mechanism.
     void interrupt_if_necessary()
     {
-           std::lock_guard<mutex_t> guard(wait_lock);
-        if (! attn_waitqueue.is_empty()) {  // (always false for single-threaded loops)
+        wait_lock.lock();
+        bool attn_q_empty = attn_waitqueue.is_empty(); // (always false for single-threaded loops)
+        wait_lock.unlock();
+
+        if (! attn_q_empty) {
             loop_mech.interrupt_wait();
         }
     }
@@ -1055,9 +1154,8 @@ class event_loop
                 }
             }
             else if (rearm_type == rearm::REARM) {
-                bdfw->watch_flags |= IN_EVENTS;
-                
                 if (! emulatedfd) {
+                    bdfw->watch_flags |= IN_EVENTS;
                     if (! backend_traits_t::has_separate_rw_fd_watches) {
                         int watch_flags = bdfw->watch_flags;
                         set_fd_enabled_nolock(bdfw, bdfw->watch_fd,
@@ -1068,12 +1166,14 @@ class event_loop
                     }
                 }
                 else {
+                    bdfw->watch_flags &= ~IN_EVENTS;
                     rearm_type = rearm::REQUEUE;
                 }
             }
             else if (rearm_type == rearm::NOOP) {
                 if (bdfw->emulatefd) {
                     if (bdfw->watch_flags & IN_EVENTS) {
+                        bdfw->watch_flags &= ~IN_EVENTS;
                         rearm_type = rearm::REQUEUE;
                     }
                 }
@@ -1125,11 +1225,12 @@ class event_loop
                 bdfw->watch_flags &= ~OUT_EVENTS;
             }
             else if (rearm_type == rearm::REARM) {
-                bdfw->watch_flags |= OUT_EVENTS;
+                bdfw->watch_flags &= ~OUT_EVENTS;
                 rearm_type = rearm::REQUEUE;
             }
             else if (rearm_type == rearm::NOOP) {
                 if (bdfw->watch_flags & OUT_EVENTS) {
+                    bdfw->watch_flags &= ~OUT_EVENTS;
                     rearm_type = rearm::REQUEUE;
                 }
             }
@@ -1209,14 +1310,13 @@ class event_loop
     //           no limit.
     bool process_events(int limit) noexcept
     {
-        auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
-        ed.lock.lock();
+        loop_mech.lock.lock();
         
         if (limit == 0) {
             return false;
         }
         
-        base_watcher * pqueue = ed.pull_event();
+        base_watcher * pqueue = loop_mech.pull_event();
         bool active = false;
         
         while (pqueue != nullptr) {
@@ -1243,7 +1343,7 @@ class event_loop
 
                 // issue a secondary dispatch:
                 bbfw->dispatch_second(this);
-                pqueue = ed.pull_event();
+                pqueue = loop_mech.pull_event();
                 continue;
             }
 
@@ -1252,10 +1352,10 @@ class event_loop
                 limit--;
                 if (limit == 0) break;
             }
-            pqueue = ed.pull_event();
+            pqueue = loop_mech.pull_event();
         }
         
-        ed.lock.unlock();
+        loop_mech.lock.unlock();
         return active;
     }
 
@@ -1329,7 +1429,7 @@ namespace dprivate {
 
 // Posix signal event watcher
 template <typename EventLoop>
-class signal_watcher : private dprivate::base_signal_watcher<typename EventLoop::mutex_t, typename EventLoop::loop_traits_t::sigdata_t>
+class signal_watcher : private dprivate::base_signal_watcher<typename EventLoop::loop_traits_t::sigdata_t>
 {
     template <typename, typename> friend class signal_watcher_impl;
 
@@ -1396,11 +1496,11 @@ class signal_watcher_impl : public signal_watcher<EventLoop>
     void dispatch(void *loop_ptr) noexcept override
     {
         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
-        loop.get_base_lock().unlock();
+        loop_access::get_base_lock(loop).unlock();
 
         auto rearm_type = static_cast<Derived *>(this)->received(loop, this->siginfo.get_signo(), this->siginfo);
 
-        loop.get_base_lock().lock();
+        loop_access::get_base_lock(loop).lock();
 
         if (rearm_type != rearm::REMOVED) {
 
@@ -1410,7 +1510,7 @@ class signal_watcher_impl : public signal_watcher<EventLoop>
                 rearm_type = rearm::REMOVE;
             }
 
-            loop.process_signal_rearm(this, rearm_type);
+            loop_access::process_signal_rearm(loop, this, rearm_type);
 
             post_dispatch(loop, this, rearm_type);
         }
@@ -1419,7 +1519,7 @@ class signal_watcher_impl : public signal_watcher<EventLoop>
 
 // Posix file descriptor event watcher
 template <typename EventLoop>
-class fd_watcher : private dprivate::base_fd_watcher<typename EventLoop::mutex_t>
+class fd_watcher : private dprivate::base_fd_watcher
 {
     template <typename, typename> friend class fd_watcher_impl;
 
@@ -1494,11 +1594,15 @@ class fd_watcher : private dprivate::base_fd_watcher<typename EventLoop::mutex_t
     {
         std::lock_guard<mutex_t> guard(eloop.get_base_lock());
         if (this->emulatefd) {
+            if (enable && ! this->emulate_enabled) {
+                loop_access::requeue_watcher(eloop, this);
+            }
             this->emulate_enabled = enable;
         }
         else {
             eloop.set_fd_enabled_nolock(this, this->watch_fd, this->watch_flags, enable);
         }
+
         if (! enable) {
             eloop.dequeue_watcher(this);
         }
@@ -1549,11 +1653,11 @@ class fd_watcher_impl : public fd_watcher<EventLoop>
         // In case emulating, clear enabled here; REARM or explicit set_enabled will re-enable.
         this->emulate_enabled = false;
 
-        loop.get_base_lock().unlock();
+        loop_access::get_base_lock(loop).unlock();
 
         auto rearm_type = static_cast<Derived *>(this)->fd_event(loop, this->watch_fd, this->event_flags);
 
-        loop.get_base_lock().lock();
+        loop_access::get_base_lock(loop).lock();
 
         if (rearm_type != rearm::REMOVED) {
             this->event_flags = 0;
@@ -1563,7 +1667,7 @@ class fd_watcher_impl : public fd_watcher<EventLoop>
                 rearm_type = rearm::REMOVE;
             }
 
-            rearm_type = loop.process_fd_rearm(this, rearm_type, false);
+            rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, false);
 
             post_dispatch(loop, this, rearm_type);
         }
@@ -1575,7 +1679,7 @@ class fd_watcher_impl : public fd_watcher<EventLoop>
 // This watcher type has two event notification methods which can both potentially be
 // active at the same time.
 template <typename EventLoop>
-class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher<typename EventLoop::mutex_t>
+class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher
 {
     template <typename, typename> friend class bidi_fd_watcher_impl;
 
@@ -1585,6 +1689,7 @@ class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher<typename EventLoo
     void set_watch_enabled(EventLoop &eloop, bool in, bool b)
     {
         int events = in ? IN_EVENTS : OUT_EVENTS;
+        auto orig_flags = this->watch_flags;
         
         if (b) {
             this->watch_flags |= events;
@@ -1605,6 +1710,13 @@ class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher<typename EventLoo
                         (this->watch_flags & IO_EVENTS) != 0);
             }
         }
+        else {
+            // emulation: if enabling a previously disabled watcher, must queue now:
+            if (b && (orig_flags != this->watch_flags)) {
+                this->watch_flags = orig_flags;
+                loop_access::requeue_watcher(eloop, watcher);
+            }
+        }
 
         if (! b) {
             eloop.dequeue_watcher(watcher);
@@ -1745,11 +1857,11 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
     {
         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
         this->emulate_enabled = false;
-        loop.get_base_lock().unlock();
+        loop_access::get_base_lock(loop).unlock();
 
         auto rearm_type = static_cast<Derived *>(this)->read_ready(loop, this->watch_fd);
 
-        loop.get_base_lock().lock();
+        loop_access::get_base_lock(loop).lock();
 
         if (rearm_type != rearm::REMOVED) {
             this->event_flags &= ~IN_EVENTS;
@@ -1759,7 +1871,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
                 rearm_type = rearm::REMOVE;
             }
 
-            rearm_type = loop.process_fd_rearm(this, rearm_type, true);
+            rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, true);
 
             post_dispatch(loop, this, rearm_type);
         }
@@ -1770,11 +1882,11 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
         auto &outwatcher = bidi_fd_watcher<EventLoop>::out_watcher;
 
         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
-        loop.get_base_lock().unlock();
+        loop_access::get_base_lock(loop).unlock();
 
         auto rearm_type = static_cast<Derived *>(this)->write_ready(loop, this->watch_fd);
 
-        loop.get_base_lock().lock();
+        loop_access::get_base_lock(loop).lock();
 
         if (rearm_type != rearm::REMOVED) {
             this->event_flags &= ~OUT_EVENTS;
@@ -1784,7 +1896,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
                 rearm_type = rearm::REMOVE;
             }
 
-            rearm_type = loop.process_secondary_rearm(this, &outwatcher, rearm_type);
+            rearm_type = loop_access::process_secondary_rearm(loop, this, &outwatcher, rearm_type);
 
             if (rearm_type == rearm::REQUEUE) {
                 post_dispatch(loop, &outwatcher, rearm_type);
@@ -1798,7 +1910,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
 
 // Child process event watcher
 template <typename EventLoop>
-class child_proc_watcher : private dprivate::base_child_watcher<typename EventLoop::mutex_t>
+class child_proc_watcher : private dprivate::base_child_watcher
 {
     template <typename, typename> friend class child_proc_watcher_impl;
 
@@ -1975,11 +2087,11 @@ class child_proc_watcher_impl : public child_proc_watcher<EventLoop>
     void dispatch(void *loop_ptr) noexcept override
     {
         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
-        loop.get_base_lock().unlock();
+        loop_access::get_base_lock(loop).unlock();
 
         auto rearm_type = static_cast<Derived *>(this)->status_change(loop, this->watch_pid, this->child_status);
 
-        loop.get_base_lock().lock();
+        loop_access::get_base_lock(loop).lock();
 
         if (rearm_type != rearm::REMOVED) {
 
@@ -1989,7 +2101,7 @@ class child_proc_watcher_impl : public child_proc_watcher<EventLoop>
                 rearm_type = rearm::REMOVE;
             }
 
-            loop.process_child_watch_rearm(this, rearm_type);
+            loop_access::process_child_watch_rearm(loop, this, rearm_type);
 
             // rearm_type = loop.process??;
             post_dispatch(loop, this, rearm_type);
@@ -1998,10 +2110,10 @@ class child_proc_watcher_impl : public child_proc_watcher<EventLoop>
 };
 
 template <typename EventLoop>
-class timer : private base_timer_watcher<typename EventLoop::mutex_t>
+class timer : private base_timer_watcher
 {
     template <typename, typename> friend class timer_impl;
-    using base_t = base_timer_watcher<typename EventLoop::mutex_t>;
+    using base_t = base_timer_watcher;
     using mutex_t = typename EventLoop::mutex_t;
 
     public:
@@ -2106,13 +2218,13 @@ class timer_impl : public timer<EventLoop>
     void dispatch(void *loop_ptr) noexcept override
     {
         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
-        loop.get_base_lock().unlock();
+        loop_access::get_base_lock(loop).unlock();
 
         auto intervals_report = this->intervals;
         this->intervals = 0;
         auto rearm_type = static_cast<Derived *>(this)->timer_expiry(loop, intervals_report);
 
-        loop.get_base_lock().lock();
+        loop_access::get_base_lock(loop).lock();
 
         if (rearm_type != rearm::REMOVED) {
 
@@ -2122,7 +2234,7 @@ class timer_impl : public timer<EventLoop>
                 rearm_type = rearm::REMOVE;
             }
 
-            loop.process_timer_rearm(this, rearm_type);
+            loop_access::process_timer_rearm(loop, this, rearm_type);
 
             post_dispatch(loop, this, rearm_type);
         }