Update bundled Dasynq (to 1.1.0).
authorDavin McCall <davmac@davmac.org>
Sat, 3 Feb 2018 19:20:16 +0000 (19:20 +0000)
committerDavin McCall <davmac@davmac.org>
Sat, 3 Feb 2018 19:20:16 +0000 (19:20 +0000)
src/dasynq/dasynq-childproc.h
src/dasynq/dasynq-epoll.h
src/dasynq/dasynq-interrupt.h
src/dasynq/dasynq-kqueue.h
src/dasynq/dasynq-posixtimer.h
src/dasynq/dasynq-select.h [new file with mode: 0644]
src/dasynq/dasynq-svec.h
src/dasynq/dasynq-timerfd.h
src/dasynq/dasynq.h

index f3f5e7ff98ea93a539b419855e32ea4dbfe2a47a..c2da99c89d50b679ddf685bd6462c17247111e58 100644 (file)
@@ -195,7 +195,9 @@ template <class Base> class child_proc_events : public Base
         sigemptyset(&chld_action.sa_mask);
         chld_action.sa_flags = 0;
         sigaction(SIGCHLD, &chld_action, nullptr);
-        loop_mech->add_signal_watch(SIGCHLD, nullptr);
+
+        // Specify a dummy user data value - sigchld_handler
+        loop_mech->add_signal_watch(SIGCHLD, (void *) dprivate::sigchld_handler);
         Base::init(loop_mech);
     }
 };
index 0cc5466473c6606cc3922343d4ba1f970d3087cc..84b0377a03134641ad1c020f5db0e2e2c1a6132d 100644 (file)
@@ -68,6 +68,9 @@ class epoll_traits
         // Epoll doesn't return the file descriptor (it can, but it can't return both file
         // descriptor and user data).
         int fd;
+
+        public:
+        fd_s(int fd_p) noexcept : fd(fd_p) { }
     };
 
     // File descriptor reference (passed to event callback). If the mechanism can return the
@@ -75,14 +78,17 @@ class epoll_traits
     // must be stored in an fd_s instance.
     class fd_r {
         public:
-        int getFd(fd_s ss)
+        int get_fd(fd_s ss)
         {
             return ss.fd;
         }
     };
     
-    const static bool has_bidi_fd_watch = true;
-    const static bool has_separate_rw_fd_watches = false;
+    constexpr static bool has_bidi_fd_watch = true;
+    constexpr static bool has_separate_rw_fd_watches = false;
+    constexpr static bool interrupt_after_fd_add = false;
+    constexpr static bool interrupt_after_signal_add = false;
+    constexpr static bool supports_non_oneshot_fd = true;
 };
 
 
@@ -132,7 +138,10 @@ template <class Base> class epoll_loop : public Base
                 (events[i].events & EPOLLHUP) && (flags |= IN_EVENTS);
                 (events[i].events & EPOLLOUT) && (flags |= OUT_EVENTS);
                 (events[i].events & EPOLLERR) && (flags |= IN_EVENTS | OUT_EVENTS | ERR_EVENTS);
-                Base::receive_fd_event(*this, fd_r(), ptr, flags);
+                auto r = Base::receive_fd_event(*this, fd_r(), ptr, flags);
+                if (std::get<0>(r) != 0) {
+                    enable_fd_watch_nolock(fd_r().get_fd(std::get<1>(r)), ptr, std::get<0>(r));
+                }
             }            
         }
     }
index 07ee12d6d8f4be4a66ff75b5e37dc8b62cf880fb..55e233d152b7988c65ac42f4a1907f7674e84ac7 100644 (file)
@@ -61,15 +61,24 @@ template <typename Base, typename Mutex> class interrupt_channel : public Base
     }
 
     template <typename T>
-    void receive_fd_event(T &loop_mech, typename Base::traits_t::fd_r fd_r_a, void * userdata, int flags)
+    std::tuple<int, typename Base::traits_t::fd_s>
+    receive_fd_event(T &loop_mech, typename Base::traits_t::fd_r fd_r_a, void * userdata, int flags)
     {
         if (userdata == &pipe_r_fd) {
             // try to clear the pipe
             char buf[64];
             read(pipe_r_fd, buf, 64);
+            if (Base::traits_t::supports_non_oneshot_fd) {
+                // If the loop mechanism actually persists none-oneshot marked watches, we don't need
+                // to re-enable:
+                return std::make_tuple(0, typename Base::traits_t::fd_s(pipe_r_fd));
+            }
+            else {
+                return std::make_tuple(IN_EVENTS, typename Base::traits_t::fd_s(pipe_r_fd));
+            }
         }
         else {
-            Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags);
+            return Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags);
         }
     }
 
index b475f00f275fc4aaa563f9a96e1bd93142cb083e..b2f03b4741ae74f901cdcabc90f7e7b767c653b6 100644 (file)
@@ -1,8 +1,8 @@
 #include <system_error>
-#include <mutex>
 #include <type_traits>
 #include <unordered_map>
 #include <vector>
+#include <tuple>
 
 #include <sys/event.h>
 #include <sys/time.h>
@@ -72,6 +72,9 @@ class kqueue_traits
     // File descriptor optional storage. If the mechanism can return the file descriptor, this
     // class will be empty, otherwise it can hold a file descriptor.
     class fd_s {
+        public:
+        fd_s(int) { }
+
         DASYNQ_EMPTY_BODY
     };
 
@@ -90,8 +93,11 @@ class kqueue_traits
         }
     };
     
-    const static bool has_bidi_fd_watch = false;
-    const static bool has_separate_rw_fd_watches = true;
+    constexpr static bool has_bidi_fd_watch = false;
+    constexpr static bool has_separate_rw_fd_watches = true;
+    constexpr static bool interrupt_after_fd_add = false;
+    constexpr static bool interrupt_after_signal_add = false;
+    constexpr static bool supports_non_oneshot_fd = false;
 };
 
 #if _POSIX_REALTIME_SIGNALS > 0
@@ -213,10 +219,15 @@ template <class Base> class kqueue_loop : public Base
             }
             else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) {
                 int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS;
-                Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags);
-                events[i].flags = EV_DISABLE | EV_CLEAR;
-                // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for
-                // another connection).
+                auto r = Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags);
+                if (std::get<0>(r) == 0) {
+                    // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for
+                    // another connection).
+                    events[i].flags = EV_DISABLE | EV_CLEAR;
+                }
+                else {
+                    events[i].flags = EV_ENABLE;
+                }
             }
             else {
                 events[i].flags = EV_DISABLE;
index 3473330a01ad7a0cce8ef7e80b7ffe837c05ca99..2715c3d6796b3a0a6dc4fe237a72a212d8eb213c 100644 (file)
@@ -117,11 +117,9 @@ template <class Base> class posix_timer_events : public timer_base<Base>
 
     // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0.
     //   enable: specifies whether to enable reporting of timeouts/intervals
-    void set_timer(timer_handle_t &timer_id, time_val &timeouttv, struct timespec &interval,
+    void set_timer(timer_handle_t &timer_id, const timespec &timeout, const timespec &interval,
             bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
     {
-        timespec timeout = timeouttv;
-
         std::lock_guard<decltype(Base::lock)> guard(Base::lock);
 
         timer_queue_t &timer_queue = this->queue_for_clock(clock);
@@ -146,12 +144,9 @@ template <class Base> class posix_timer_events : public timer_base<Base>
     }
 
     // Set timer relative to current time:
-    void set_timer_rel(timer_handle_t &timer_id, const time_val &timeouttv, const time_val &intervaltv,
+    void set_timer_rel(timer_handle_t &timer_id, const timespec &timeout, const timespec &interval,
             bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
     {
-        timespec timeout = timeouttv;
-        timespec interval = intervaltv;
-
         // TODO consider caching current time somehow; need to decide then when to update cached value.
         struct timespec curtime;
         int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME;
diff --git a/src/dasynq/dasynq-select.h b/src/dasynq/dasynq-select.h
new file mode 100644 (file)
index 0000000..a10e81a
--- /dev/null
@@ -0,0 +1,448 @@
+#include <system_error>
+#include <vector>
+#include <atomic>
+
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <sys/stat.h>
+#include <sys/select.h>
+
+#include <unistd.h>
+#include <signal.h>
+#include <setjmp.h>
+
+#include "dasynq-config.h"
+
+// "pselect"-based event loop mechanism.
+//
+
+namespace dasynq {
+
+template <class Base> class select_events;
+
+class select_traits
+{
+    public:
+
+    class sigdata_t
+    {
+        template <class Base> friend class select_events;
+
+        siginfo_t info;
+
+        public:
+        // mandatory:
+        int get_signo() { return info.si_signo; }
+        int get_sicode() { return info.si_code; }
+        pid_t get_sipid() { return info.si_pid; }
+        uid_t get_siuid() { return info.si_uid; }
+        void * get_siaddr() { return info.si_addr; }
+        int get_sistatus() { return info.si_status; }
+        int get_sival_int() { return info.si_value.sival_int; }
+        void * get_sival_ptr() { return info.si_value.sival_ptr; }
+
+        // XSI
+        int get_sierrno() { return info.si_errno; }
+
+        // XSR (streams) OB (obselete)
+#if !defined(__OpenBSD__)
+        // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS
+        // interface.
+        int get_siband() { return info.si_band; }
+#endif
+
+        void set_signo(int signo) { info.si_signo = signo; }
+    };
+
+    class fd_r;
+
+    // File descriptor optional storage. If the mechanism can return the file descriptor, this
+    // class will be empty, otherwise it can hold a file descriptor.
+    class fd_s {
+        public:
+        fd_s(int fd) noexcept { }
+
+        DASYNQ_EMPTY_BODY
+    };
+
+    // File descriptor reference (passed to event callback). If the mechanism can return the
+    // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
+    // must be stored in an fd_s instance.
+    class fd_r {
+        int fd;
+        public:
+        int getFd(fd_s ss)
+        {
+            return fd;
+        }
+        fd_r(int nfd) : fd(nfd)
+        {
+        }
+    };
+
+    constexpr static bool has_bidi_fd_watch = false;
+    constexpr static bool has_separate_rw_fd_watches = true;
+    // requires interrupt after adding/enabling an fd:
+    constexpr static bool interrupt_after_fd_add = true;
+    constexpr static bool interrupt_after_signal_add = true;
+    constexpr static bool supports_non_oneshot_fd = false;
+};
+
+namespace dprivate {
+namespace select_mech {
+
+// We need to declare and define a non-static data variable, "siginfo_p", in this header, without
+// violating the "one definition rule". The only way to do that is via a template, even though we
+// don't otherwise need a template here:
+template <typename T = decltype(nullptr)> class sig_capture_templ
+{
+    public:
+    static siginfo_t siginfo_cap;
+    static sigjmp_buf rjmpbuf;
+
+    static void signal_handler(int signo, siginfo_t *siginfo, void *v)
+    {
+        siginfo_cap = *siginfo;
+        siglongjmp(rjmpbuf, 1);
+    }
+};
+template <typename T> siginfo_t sig_capture_templ<T>::siginfo_cap;
+template <typename T> sigjmp_buf sig_capture_templ<T>::rjmpbuf;
+
+using sig_capture = sig_capture_templ<>;
+
+inline void prepare_signal(int signo)
+{
+    struct sigaction the_action;
+    the_action.sa_sigaction = sig_capture::signal_handler;
+    the_action.sa_flags = SA_SIGINFO;
+    sigfillset(&the_action.sa_mask);
+
+    sigaction(signo, &the_action, nullptr);
+}
+
+inline sigjmp_buf &get_sigreceive_jmpbuf()
+{
+    return sig_capture::rjmpbuf;
+}
+
+inline void unprep_signal(int signo)
+{
+    signal(signo, SIG_DFL);
+}
+
+inline siginfo_t * get_siginfo()
+{
+    return &sig_capture::siginfo_cap;
+}
+
+} } // namespace dasynq :: select_mech
+
+template <class Base> class select_events : public Base
+{
+    fd_set read_set;
+    fd_set write_set;
+    //fd_set error_set;  // logical OR of both the above
+    int max_fd = 0; // highest fd in any of the sets
+
+    sigset_t active_sigmask; // mask out unwatched signals i.e. active=0
+    void * sig_userdata[NSIG];
+
+    // userdata pointers in read and write respectively, for each fd:
+    std::vector<void *> rd_udata;
+    std::vector<void *> wr_udata;
+
+    // Base contains:
+    //   lock - a lock that can be used to protect internal structure.
+    //          receive*() methods will be called with lock held.
+    //   receive_signal(sigdata_t &, user *) noexcept
+    //   receive_fd_event(fd_r, user *, int flags) noexcept
+
+    using sigdata_t = select_traits::sigdata_t;
+    using fd_r = typename select_traits::fd_r;
+
+    void process_events(fd_set *read_set_p, fd_set *write_set_p, fd_set *error_set_p)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+        // Note: if error is set, report read-ready.
+
+        for (int i = 0; i <= max_fd; i++) {
+            if (FD_ISSET(i, read_set_p) || FD_ISSET(i, error_set_p)) {
+                if (FD_ISSET(i, &read_set) && rd_udata[i] != nullptr) {
+                    // report read
+                    auto r = Base::receive_fd_event(*this, fd_r(i), rd_udata[i], IN_EVENTS);
+                    if (std::get<0>(r) == 0) {
+                        FD_CLR(i, &read_set);
+                    }
+                }
+            }
+        }
+
+        for (int i = 0; i <= max_fd; i++) {
+            if (FD_ISSET(i, write_set_p)) {
+                if (FD_ISSET(i, &write_set) && wr_udata[i] != nullptr) {
+                    // report write
+                    auto r = Base::receive_fd_event(*this, fd_r(i), wr_udata[i], OUT_EVENTS);
+                    if (std::get<0>(r) == 0) {
+                        FD_CLR(i, &write_set);
+                    }
+                }
+            }
+        }
+    }
+
+    public:
+
+    /**
+     * select_events constructor.
+     *
+     * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
+     */
+    select_events()
+    {
+        FD_ZERO(&read_set);
+        FD_ZERO(&write_set);
+        sigfillset(&active_sigmask);
+        Base::init(this);
+    }
+
+    ~select_events()
+    {
+    }
+
+    //        fd:  file descriptor to watch
+    //  userdata:  data to associate with descriptor
+    //     flags:  IN_EVENTS | OUT_EVENTS | ONE_SHOT
+    //             (only one of IN_EVENTS/OUT_EVENTS can be specified)
+    // soft_fail:  true if unsupported file descriptors should fail by returning false instead
+    //             of throwing an exception
+    // returns: true on success; false if file descriptor type isn't supported and emulate == true
+    // throws:  std::system_error or std::bad_alloc on failure
+    bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false)
+    {
+        if (fd >= FD_SETSIZE) {
+            throw std::system_error(EMFILE, std::system_category());
+        }
+
+        if (flags & IN_EVENTS) {
+            FD_SET(fd, &read_set);
+            if (size_t(fd) >= rd_udata.size()) {
+                rd_udata.resize(fd + 1);
+            }
+            rd_udata[fd] = userdata;
+        }
+        else {
+            FD_SET(fd, &write_set);
+            if (size_t(fd) >= wr_udata.size()) {
+                wr_udata.resize(fd + 1);
+            }
+            wr_udata[fd] = userdata;
+        }
+
+        max_fd = std::max(fd, max_fd);
+
+        return true;
+    }
+
+    // returns: 0 on success
+    //          IN_EVENTS  if in watch requires emulation
+    //          OUT_EVENTS if out watch requires emulation
+    int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false)
+    {
+        if (fd >= FD_SETSIZE) {
+            throw std::system_error(EMFILE, std::system_category());
+        }
+
+        if (flags & IN_EVENTS) {
+            FD_SET(fd, &read_set);
+            if (size_t(fd) >= rd_udata.size()) {
+                rd_udata.resize(fd + 1);
+            }
+            rd_udata[fd] = userdata;
+        }
+        if (flags & OUT_EVENTS) {
+            FD_SET(fd, &write_set);
+            if (size_t(fd) >= wr_udata.size()) {
+                wr_udata.resize(fd + 1);
+            }
+            wr_udata[fd] = userdata;
+        }
+
+        max_fd = std::max(fd, max_fd);
+
+        return 0;
+    }
+
+    // flags specifies which watch to remove; ignored if the loop doesn't support
+    // separate read/write watches.
+    void remove_fd_watch_nolock(int fd, int flags)
+    {
+        if (flags & IN_EVENTS) {
+            FD_CLR(fd, &read_set);
+            rd_udata[fd] = nullptr;
+        }
+        if (flags & OUT_EVENTS) {
+            FD_CLR(fd, &write_set);
+            wr_udata[fd] = nullptr;
+        }
+    }
+
+    void remove_fd_watch(int fd, int flags)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        remove_fd_watch_nolock(fd, flags);
+    }
+
+    void remove_bidi_fd_watch(int fd) noexcept
+    {
+        FD_CLR(fd, &read_set);
+        FD_CLR(fd, &write_set);
+    }
+
+    void enable_fd_watch_nolock(int fd, void *userdata, int flags)
+    {
+        if (flags & IN_EVENTS) {
+            FD_SET(fd, &read_set);
+        }
+        else {
+            FD_SET(fd, &write_set);
+        }
+    }
+
+    void enable_fd_watch(int fd, void *userdata, int flags)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        enable_fd_watch_nolock(fd, userdata, flags);
+    }
+
+    void disable_fd_watch_nolock(int fd, int flags)
+    {
+        if (flags & IN_EVENTS) {
+            FD_CLR(fd, &read_set);
+        }
+        else {
+            FD_CLR(fd, &write_set);
+        }
+    }
+
+    void disable_fd_watch(int fd, int flags)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        disable_fd_watch_nolock(fd, flags);
+    }
+
+    // Note signal should be masked before call.
+    void add_signal_watch(int signo, void *userdata)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        add_signal_watch_nolock(signo, userdata);
+    }
+
+    // Note signal should be masked before call.
+    void add_signal_watch_nolock(int signo, void *userdata)
+    {
+        sig_userdata[signo] = userdata;
+        sigdelset(&active_sigmask, signo);
+        dprivate::select_mech::prepare_signal(signo);
+    }
+
+    // Note, called with lock held:
+    void rearm_signal_watch_nolock(int signo, void *userdata) noexcept
+    {
+        sig_userdata[signo] = userdata;
+        sigdelset(&active_sigmask, signo);
+    }
+
+    void remove_signal_watch_nolock(int signo) noexcept
+    {
+        dprivate::select_mech::unprep_signal(signo);
+        sigaddset(&active_sigmask, signo);
+        sig_userdata[signo] = nullptr;
+        // No need to signal other threads
+    }
+
+    void remove_signal_watch(int signo) noexcept
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        remove_signal_watch_nolock(signo);
+    }
+
+    public:
+
+    // If events are pending, process an unspecified number of them.
+    // If no events are pending, wait until one event is received and
+    // process this event (and possibly any other events received
+    // simultaneously).
+    // If processing an event removes a watch, there is a possibility
+    // that the watched event will still be reported (if it has
+    // occurred) before pull_events() returns.
+    //
+    //  do_wait - if false, returns immediately if no events are
+    //            pending.
+    void pull_events(bool do_wait) noexcept
+    {
+        using namespace dprivate::select_mech;
+
+        struct timespec ts;
+        ts.tv_sec = 0;
+        ts.tv_nsec = 0;
+
+        fd_set read_set_c;
+        fd_set write_set_c;
+        fd_set err_set;
+
+        Base::lock.lock();
+        read_set_c = read_set;
+        write_set_c = write_set;
+        err_set = read_set;
+
+        sigset_t sigmask;
+        this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
+        // This is horrible, but hopefully will be optimised well. POSIX gives no way to combine signal
+        // sets other than this.
+        for (int i = 1; i < NSIG; i++) {
+            if (! sigismember(&active_sigmask, i)) {
+                sigdelset(&sigmask, i);
+            }
+        }
+        int nfds = max_fd + 1;
+        Base::lock.unlock();
+
+        volatile bool was_signalled = false;
+
+        // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
+        // received during polling, it will longjmp back to here:
+        if (sigsetjmp(get_sigreceive_jmpbuf(), 1) != 0) {
+            std::atomic_signal_fence(std::memory_order_acquire);
+            auto * sinfo = get_siginfo();
+            sigdata_t sigdata;
+            sigdata.info = *sinfo;
+            Base::lock.lock();
+            void *udata = sig_userdata[sinfo->si_signo];
+            if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) {
+                sigaddset(&sigmask, sinfo->si_signo);
+                sigaddset(&active_sigmask, sinfo->si_signo);
+            }
+            Base::lock.unlock();
+            was_signalled = true;
+        }
+
+        if (was_signalled) {
+            do_wait = false;
+        }
+
+        std::atomic_signal_fence(std::memory_order_release);
+
+        int r = pselect(nfds, &read_set_c, &write_set_c, &err_set, do_wait ? nullptr : &ts, &sigmask);
+        if (r == -1 || r == 0) {
+            // signal or no events
+            return;
+        }
+
+        process_events(&read_set_c, &write_set_c, &err_set);
+    }
+};
+
+} // end namespace
index d32190e86ccaa0fda8c9c51d1d8cdddc378e9cd4..b8cc7f78b6fb7984285a145dcc754482bc171343 100644 (file)
@@ -5,7 +5,11 @@
 #include <utility>
 #include <new>
 
-// Vector with possibility to shrink capacity arbitrarily
+// Vector with possibility to shrink capacity arbitrarily.
+//
+// The standard vector (std::vector) only allows shrinking a vector's capacity to its current size. In cases
+// where we need to keep some reserved capacity beyond the current size, we need an alternative solution: hence,
+// this class, svector.
 
 namespace dasynq {
 
@@ -13,7 +17,14 @@ template <typename T>
 class svector
 {
     private:
-    T * array;
+    union vec_node {
+        T elem;
+
+        vec_node() { }
+        ~vec_node() { }
+    };
+
+    vec_node * array;
     size_t size_v;
     size_t capacity_v;
 
@@ -22,10 +33,10 @@ class svector
         if (size_v == capacity_v) {
             // double capacity now:
             if (capacity_v == 0) capacity_v = 1;
-            T * new_array = new T[capacity_v * 2];
+            vec_node * new_array = new vec_node[capacity_v * 2];
             for (size_t i = 0; i < size_v; i++) {
-                new (&new_array[i]) T(std::move(array[i]));
-                array[i].T::~T();
+                new (&new_array[i].elem) T(std::move(array[i].elem));
+                array[i].elem.T::~T();
             }
             delete[] array;
             array = new_array;
@@ -47,14 +58,14 @@ class svector
         size_v = other.size_v;
         array = new T[capacity_v];
         for (size_t i = 0; i < size_v; i++) {
-            new (&array[i]) T(other[i]);
+            new (&array[i].elem) T(other[i].elem);
         }
     }
 
     ~svector()
     {
         for (size_t i = 0; i < size_v; i++) {
-            array[i].T::~T();
+            array[i].elem.T::~T();
         }
         delete[] array;
     }
@@ -62,14 +73,14 @@ class svector
     void push_back(const T &t)
     {
         check_capacity();
-        new (&array[size_v]) T(t);
+        new (&array[size_v].elem) T(t);
         size_v++;
     }
 
     void push_back(T &&t)
     {
         check_capacity();
-        new (&array[size_v]) T(t);
+        new (&array[size_v].elem) T(t);
         size_v++;
     }
 
@@ -77,7 +88,7 @@ class svector
     void emplace_back(U... args)
     {
         check_capacity();
-        new (&array[size_v]) T(args...);
+        new (&array[size_v].elem) T(args...);
         size_v++;
     }
 
@@ -88,12 +99,12 @@ class svector
 
     T &operator[](size_t index)
     {
-        return array[index];
+        return array[index].elem;
     }
 
     const T &operator[](size_t index) const
     {
-        return array[index];
+        return array[index].elem;
     }
 
     size_t size() const
@@ -124,10 +135,10 @@ class svector
     void reserve(size_t amount)
     {
         if (capacity_v < amount) {
-            T * new_array = new T[amount];
+            vec_node * new_array = new vec_node[amount];
             for (size_t i = 0; i < size_v; i++) {
-                new (&new_array[i]) T(std::move(array[i]));
-                array[i].T::~T();
+                new (&new_array[i].elem) T(std::move(array[i].elem));
+                array[i].elem.T::~T();
             }
             delete[] array;
             array = new_array;
@@ -138,13 +149,13 @@ class svector
     void shrink_to(size_t amount)
     {
         if (capacity_v > amount) {
-            T * new_array = new(std::nothrow) T[amount];
+            vec_node * new_array = new(std::nothrow) vec_node[amount];
             if (new_array == nullptr) {
                 return;
             }
             for (size_t i = 0; i < size_v; i++) {
-                new (&new_array[i]) T(std::move(array[i]));
-                array[i].T::~T();
+                new (&new_array[i].elem) T(std::move(array[i].elem));
+                array[i].elem.T::~T();
             }
             delete[] array;
             array = new_array;
@@ -154,27 +165,27 @@ class svector
 
     T &back()
     {
-        return array[size_v - 1];
+        return array[size_v - 1].elem;
     }
 
     T* begin()
     {
-        return array;
+        return reinterpret_cast<T *>(array);
     }
 
     const T *begin() const
     {
-        return array;
+        return reinterpret_cast<const T *>(array);
     }
 
     T* end()
     {
-        return array + size_v;
+        return reinterpret_cast<T *>(array + size_v);
     }
 
     const T *end() const
     {
-        return array + size_v;
+        return reinterpret_cast<const T *>(array + size_v);
     }
 };
 
index c3b465d4c886b57e825f093f5b43bfcfaa2735da..51bcdbe14fc99d324106bc8d61d559abb7c5a378 100644 (file)
@@ -96,16 +96,22 @@ template <class Base> class timer_fd_events : public timer_base<Base>
     };
 
     template <typename T>
-    void receive_fd_event(T &loop_mech, typename traits_t::fd_r fd_r_a, void * userdata, int flags)
+    std::tuple<int, typename traits_t::fd_s>
+    receive_fd_event(T &loop_mech, typename traits_t::fd_r fd_r_a, void * userdata, int flags)
     {
         if (userdata == &timerfd_fd) {
             process_timer(clock_type::MONOTONIC, timerfd_fd);
+            return std::make_tuple(IN_EVENTS, typename traits_t::fd_s(timerfd_fd));
         }
         else if (userdata == &systemtime_fd) {
             process_timer(clock_type::SYSTEM, systemtime_fd);
+            if (Base::traits_t::supports_non_oneshot_fd) {
+                return std::make_tuple(0, typename traits_t::fd_s(systemtime_fd));
+            }
+            return std::make_tuple(IN_EVENTS, typename traits_t::fd_s(systemtime_fd));
         }
         else {
-            Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags);
+            return Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags);
         }
     }
 
index 08439b8a9d22106bf18aa94da0896478ec99c500..68175c4daa88e045cbe86f26bf4c31f5876db45f 100644 (file)
@@ -64,7 +64,23 @@ namespace dasynq {
     using loop_traits_t = epoll_traits;
 }
 #else
-#error No loop backened defined - see dasynq-config.h
+#include "dasynq-select.h"
+#if _POSIX_TIMERS > 0
+#include "dasynq-posixtimer.h"
+namespace dasynq {
+    template <typename T> using timer_events = posix_timer_events<T>;
+}
+#else
+#include "dasynq-itimer.h"
+namespace dasynq {
+    template <typename T> using timer_events = itimer_events<T>;
+}
+#endif
+#include "dasynq-childproc.h"
+namespace dasynq {
+    template <typename T> using loop_t = select_events<interrupt_channel<timer_events<child_proc_events<T>>>>;
+    using loop_traits_t = select_traits;
+}
 #endif
 
 #include <atomic>
@@ -320,12 +336,17 @@ namespace dprivate {
             return true;
         }
         
+        // Receive fd event delivered from backend mechansim. Returns the desired watch mask, as per
+        // set_fd_enabled, which can be used to leave the watch disabled, re-enable it or re-enable
+        // one direction of a bi-directional watcher.
         template <typename T>
-        void receive_fd_event(T &loop_mech, typename Traits::fd_r fd_r, void * userdata, int flags) noexcept
+        std::tuple<int, typename Traits::fd_s> receive_fd_event(T &loop_mech, typename Traits::fd_r fd_r,
+                void * userdata, int flags) noexcept
         {
             base_fd_watcher * bfdw = static_cast<base_fd_watcher *>(userdata);
             
             bfdw->event_flags |= flags;
+            typename Traits::fd_s watch_fd_s {bfdw->watch_fd};
             
             base_watcher * bwatcher = bfdw;
             
@@ -345,17 +366,20 @@ namespace dprivate {
 
             queue_watcher(bwatcher);
             
-            if (! traits_t::has_separate_rw_fd_watches) {
+            if (is_multi_watch && ! traits_t::has_separate_rw_fd_watches) {
                 // If this is a bidirectional fd-watch, it has been disabled in *both* directions
                 // as the event was delivered. However, the other direction should not be disabled
                 // yet, so we need to re-enable:
                 int in_out_mask = IN_EVENTS | OUT_EVENTS;
-                if (is_multi_watch && (bfdw->watch_flags & in_out_mask) != 0) {
+                if ((bfdw->watch_flags & in_out_mask) != 0) {
                     // We need to re-enable the other channel now:
-                    loop_mech.enable_fd_watch_nolock(bfdw->watch_fd, userdata,
-                        (bfdw->watch_flags & in_out_mask) | ONE_SHOT);
+                    return std::make_tuple((bfdw->watch_flags & in_out_mask) | ONE_SHOT, watch_fd_s);
+                    // We are the polling thread: don't need to interrupt polling, even if it would
+                    // normally be required.
                 }
             }
+
+            return std::make_tuple(0, watch_fd_s);
         }
         
         // Child process terminated. Called with both the main lock and the reaper lock held.
@@ -388,7 +412,7 @@ namespace dprivate {
             return r;
         }
         
-        // Queue a watcher for reomval, or issue "removed" callback to it.
+        // Queue a watcher for removal, or issue "removed" callback to it.
         // Call with lock free.
         void issue_delete(base_watcher *watcher) noexcept
         {
@@ -545,8 +569,7 @@ class event_loop
     // - The mutex only protects manipulation of the wait queues, and so should not
     //   be highly contended.
     
-    mutex_t wait_lock;  // wait lock, used to prevent multiple threads from waiting
-                        // on the event queue simultaneously.
+    mutex_t wait_lock;  // protects the wait/attention queues
     waitqueue<mutex_t> attn_waitqueue;
     waitqueue<mutex_t> wait_waitqueue;
     
@@ -568,6 +591,9 @@ class event_loop
         loop_mech.prepare_watcher(callBack);
         try {
             loop_mech.add_signal_watch_nolock(signo, callBack);
+            if (backend_traits_t::interrupt_after_signal_add) {
+                interrupt_if_necessary();
+            }
         }
         catch (...) {
             loop_mech.release_watcher(callBack);
@@ -606,6 +632,9 @@ class event_loop
                     }
                 }
             }
+            else if (enabled && backend_traits_t::interrupt_after_fd_add) {
+                interrupt_if_necessary();
+            }
         }
         catch (...) {
             loop_mech.release_watcher(callback);
@@ -622,6 +651,7 @@ class event_loop
         try {
             loop_mech.prepare_watcher(&callback->out_watcher);
             try {
+                bool do_interrupt = false;
                 if (backend_traits_t::has_separate_rw_fd_watches) {
                     int r = loop_mech.add_bidi_fd_watch(fd, callback, eventmask | ONE_SHOT, emulate);
                     if (r & IN_EVENTS) {
@@ -630,12 +660,19 @@ class event_loop
                             requeue_watcher(callback);
                         }
                     }
+                    else if ((eventmask & IN_EVENTS) && backend_traits_t::interrupt_after_fd_add) {
+                        do_interrupt = true;
+                    }
+
                     if (r & OUT_EVENTS) {
                         callback->out_watcher.emulatefd = true;
                         if (eventmask & OUT_EVENTS) {
                             requeue_watcher(&callback->out_watcher);
                         }
                     }
+                    else if ((eventmask & OUT_EVENTS) && backend_traits_t::interrupt_after_fd_add) {
+                        do_interrupt = true;
+                    }
                 }
                 else {
                     if (! loop_mech.add_fd_watch(fd, callback, eventmask | ONE_SHOT, true, emulate)) {
@@ -648,6 +685,13 @@ class event_loop
                             requeue_watcher(&callback->out_watcher);
                         }
                     }
+                    else if (backend_traits_t::interrupt_after_fd_add) {
+                        do_interrupt = true;
+                    }
+                }
+
+                if (do_interrupt) {
+                    interrupt_if_necessary();
                 }
             }
             catch (...) {
@@ -665,6 +709,9 @@ class event_loop
     {
         if (enabled) {
             loop_mech.enable_fd_watch(fd, watcher, watch_flags | ONE_SHOT);
+            if (backend_traits_t::interrupt_after_fd_add) {
+                interrupt_if_necessary();
+            }
         }
         else {
             loop_mech.disable_fd_watch(fd, watch_flags);
@@ -675,6 +722,9 @@ class event_loop
     {
         if (enabled) {
             loop_mech.enable_fd_watch_nolock(fd, watcher, watch_flags | ONE_SHOT);
+            if (backend_traits_t::interrupt_after_fd_add) {
+                interrupt_if_necessary();
+            }
         }
         else {
             loop_mech.disable_fd_watch_nolock(fd, watch_flags);
@@ -874,6 +924,16 @@ class event_loop
         }
     }
 
+    // Interrupt the current poll-waiter, if necessary - that is, if the loop is multi-thread safe, and if
+    // there is currently another thread polling the backend event mechanism.
+    void interrupt_if_necessary()
+    {
+           std::lock_guard<mutex_t> guard(wait_lock);
+        if (! attn_waitqueue.is_empty()) {  // (always false for single-threaded loops)
+            loop_mech.interrupt_wait();
+        }
+    }
+
     // Acquire the attention lock (when held, ensures that no thread is polling the AEN
     // mechanism). This can be used to safely remove watches, since it is certain that
     // notification callbacks won't be run while the attention lock is held.
@@ -932,6 +992,9 @@ class event_loop
         // Called with lock held
         if (rearm_type == rearm::REARM) {
             loop_mech.rearm_signal_watch_nolock(bsw->siginfo.get_signo(), bsw);
+            if (backend_traits_t::interrupt_after_signal_add) {
+                interrupt_if_necessary();
+            }
         }
         else if (rearm_type == rearm::REMOVE) {
             loop_mech.remove_signal_watch_nolock(bsw->siginfo.get_signo());
@@ -963,7 +1026,8 @@ class event_loop
                         if (bdfw->watch_flags & IN_EVENTS) {
                             bdfw->watch_flags &= ~IN_EVENTS;
                             if (! emulatedfd) {
-                                loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags);
+                                set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags,
+                                        bdfw->watch_flags != 0);
                             }
                         }
                         return rearm::NOOP;
@@ -982,12 +1046,8 @@ class event_loop
 
                 if (! emulatedfd) {
                     if (! backend_traits_t::has_separate_rw_fd_watches) {
-                        int watch_flags = bdfw->watch_flags;
-                        // without separate r/w watches, enable_fd_watch actually sets
-                        // which sides are enabled (i.e. can be used to disable):
-                        loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
-                                static_cast<base_watcher *>(bdfw),
-                                (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+                        int watch_flags = bdfw->watch_flags  & (IN_EVENTS | OUT_EVENTS);
+                        set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags, watch_flags != 0);
                     }
                     else {
                         loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
@@ -1000,14 +1060,11 @@ class event_loop
                 if (! emulatedfd) {
                     if (! backend_traits_t::has_separate_rw_fd_watches) {
                         int watch_flags = bdfw->watch_flags;
-                        loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
-                                static_cast<base_watcher *>(bdfw),
-                                (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+                        set_fd_enabled_nolock(bdfw, bdfw->watch_fd,
+                                watch_flags & (IN_EVENTS | OUT_EVENTS), true);
                     }
                     else {
-                        loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
-                                static_cast<base_watcher *>(bdfw),
-                                IN_EVENTS | ONE_SHOT);
+                        set_fd_enabled_nolock(bdfw, bdfw->watch_fd, IN_EVENTS, true);
                     }
                 }
                 else {
@@ -1039,8 +1096,8 @@ class event_loop
                 }
             }
             else  if (rearm_type == rearm::REARM) {
-                loop_mech.enable_fd_watch_nolock(bfw->watch_fd, bfw,
-                        (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+                set_fd_enabled_nolock(bfw, bfw->watch_fd,
+                        bfw->watch_flags & (IN_EVENTS | OUT_EVENTS), true);
             }
             else if (rearm_type == rearm::DISARM) {
                 loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
@@ -1090,7 +1147,7 @@ class event_loop
                 if (! bdfw->read_removed) {
                     if (bdfw->watch_flags & OUT_EVENTS) {
                         bdfw->watch_flags &= ~OUT_EVENTS;
-                        loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags);
+                        set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags, true);
                     }
                     return rearm::NOOP;
                 }
@@ -1106,9 +1163,7 @@ class event_loop
 
             if (! backend_traits_t::has_separate_rw_fd_watches) {
                 int watch_flags = bdfw->watch_flags;
-                loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
-                        static_cast<base_watcher *>(bdfw),
-                        (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+                set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags & (IN_EVENTS | OUT_EVENTS), true);
             }
             else {
                 loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, OUT_EVENTS);
@@ -1119,14 +1174,10 @@ class event_loop
             
             if (! backend_traits_t::has_separate_rw_fd_watches) {
                 int watch_flags = bdfw->watch_flags;
-                loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
-                        static_cast<base_watcher *>(bdfw),
-                        (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+                set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags & (IN_EVENTS | OUT_EVENTS), true);
             }
             else {
-                loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
-                        static_cast<base_watcher *>(bdfw),
-                        OUT_EVENTS | ONE_SHOT);
+                set_fd_enabled_nolock(bdfw, bdfw->watch_fd, OUT_EVENTS | ONE_SHOT, true);
             }
         }
         return rearm_type;