Update bundled Dasynq to 1.1.4.
authorDavin McCall <davmac@davmac.org>
Fri, 6 Apr 2018 21:00:18 +0000 (22:00 +0100)
committerDavin McCall <davmac@davmac.org>
Fri, 6 Apr 2018 21:00:18 +0000 (22:00 +0100)
src/dasynq/dasynq-config.h
src/dasynq/dasynq-itimer.h
src/dasynq/dasynq-kqueue-macos.h
src/dasynq/dasynq-kqueue.h
src/dasynq/dasynq-posixtimer.h
src/dasynq/dasynq-pselect.h [new file with mode: 0644]
src/dasynq/dasynq-select.h
src/dasynq/dasynq-signal.h
src/dasynq/dasynq-timerbase.h
src/dasynq/dasynq.h

index b38b387e028d58e96401cd96149b16fd39902d11..0ee10f6d34f66c3766c53e94457ec056b7f27a38 100644 (file)
@@ -3,11 +3,13 @@
 
 // You can customise Dasynq's build options in this file. Typically, you won't need to do anything; the
 // defaults are sensible for a range of operating systems, though for some BSD family OSes you may need
-// to explicitly define DASYNQ_HAVE_KQUEUE to 1. Either kqueue or epoll must be available (i.e. either
-// DASYNQ_HAVE_KQUEUE or DASYNQ_HAVE_EPOLL need to be defined to 1). There are two parts to the file: the
-// first is the custom configuration section, where you may specify custom settings, and the second
-// section contains automatic configuration to fill in remaining settings based on known features in
-// certain operating systems and compilers.
+// to explicitly define DASYNQ_HAVE_KQUEUE to 1. If neither epoll nor kqueue are available, the select-
+// based backend is used, and DASYNQ_HAVE_PSELECT must be defined (to either 1 or 0, if pselect is or is
+// not available, respectively).
+
+// There are two parts to the file: the first is the custom configuration section, where you may specify
+// custom settings, and the second section contains automatic configuration to fill in remaining settings
+// based on known features in certain operating systems and compilers.
 
 // ---------------------------------------------------------------------------------------------------------
 // Part 1: Custom configuration, please edit to suit your system / requirements.
 //     #define DASYNQ_HAVE_KQUEUE 1
 //
 // If the epoll family of system calls are available:
-//     #define DASYNQ_HAVE_KQUEUE 1
+//     #define DASYNQ_HAVE_EPOLL 1
 //
 // If the pipe2 system call is available:
 //     #define HAVE_PIPE2 1
 //
+// If the pselect system call is available:
+//     #define HAVE_PSELECT 1
+//
 // A tag to include at the end of a class body for a class which is allowed to have zero size.
 // Normally, C++ mandates that all objects (except empty base subobjects) have non-zero size, but on some
 // compilers (at least GCC and LLVM-Clang) there are tricks to get around this awkward limitation. Note that
 #if ! defined(DASYNQ_HAVE_KQUEUE)
 #if defined(__OpenBSD__) || defined(__APPLE__) || defined(__FreeBSD__)
 #define DASYNQ_HAVE_KQUEUE 1
-#if defined(__APPLE__)
-// kqueue on macos has "issues". See extra/macos-kqueue-bug. There is an alternate Dasyqn kqueue backend
-// which avoids the issue, which is enabled via DASYNQ_KQUEUE_MACOS_WORKAROUND.
-#define DASYNQ_KQUEUE_MACOS_WORKAROUND 1
 #endif
 #endif
+
+#if DASYNQ_HAVE_KQUEUE && !defined(DASYNQ_KQUEUE_MACOS_WORKAROUND) && defined(__APPLE__)
+// kqueue on macos has "issues". See extra/macos-kqueue-bug. There is an alternate Dasynq kqueue backend
+// which avoids the issue, which is enabled via DASYNQ_KQUEUE_MACOS_WORKAROUND.
+#define DASYNQ_KQUEUE_MACOS_WORKAROUND 1
 #endif
 
-#if defined(__linux__)
 #if ! defined(DASYNQ_HAVE_EPOLL)
+#if defined(__linux__)
 #define DASYNQ_HAVE_EPOLL 1
 #endif
 #endif
 
+#if ! defined(DASYNQ_HAVE_PSELECT)
+#if defined(__sortix__)
+// Sortix doesn't have pselect yet (but has select):
+#define DASYNQ_HAVE_PSELECT 0
+#else
+// POSIX actually requires pselect, so we otherwise assume it's available:
+#define DASYNQ_HAVE_PSELECT 1
+#endif
+#endif
+
 // General feature availability
 
 #if (defined(__OpenBSD__) || defined(__linux__)) && ! defined(HAVE_PIPE2)
index 9ecf82831426f19e626db20f9e4b19d7056663ef..b004e9532920b84113cfa7b4cd673ffaf42d050f 100644 (file)
@@ -11,10 +11,13 @@ namespace dasynq {
 
 // Timer implementation based on the (basically obsolete) POSIX itimer interface.
 
-// With this timer implementation, we only use one clock, and allow no distinction between the
-// monotonic and system time.
+// With this timer implementation, we only have one real clock (the monotonic clock) that we can
+// run a timer against. However, if the system has both clocks, we still maintain two separate queues.
+// If provide_mono_timer is false we actually provide no system timer and rely on the event mechanism
+// which extends this class to measure time and run timeouts (via process_monotonic_timers functions).
 
-template <class Base> class itimer_events : public timer_base<Base>
+template <class Base, bool provide_mono_timer = true>
+class itimer_events : public timer_base<Base>
 {
     private:
     
@@ -94,31 +97,100 @@ template <class Base> class itimer_events : public timer_base<Base>
     bool receive_signal(T & loop_mech, sigdata_t &siginfo, void *userdata)
     {
         if (siginfo.get_signo() == SIGALRM) {
-            auto &timer_queue = this->queue_for_clock(clock_type::SYSTEM);
-            if (! timer_queue.empty()) {
-                struct timespec curtime;
-                timer_base<Base>::get_time(curtime, clock_type::SYSTEM, true);
-                timer_base<Base>::process_timer_queue(timer_queue, curtime);
+            process_timers();
+            return false; // don't disable signal watch
+        }
+        else {
+            return Base::receive_signal(loop_mech, siginfo, userdata);
+        }
+    }
+
+#ifdef CLOCK_MONOTONIC
+    // We need to override the monotonic timer processing functions from timer_base to check both
+    // timer queues, since we aren't actually providing a separate timer for the system clock:
+
+    inline void process_monotonic_timers(bool &do_wait, timeval &tv, timeval *&wait_tv)
+    {
+        // We need to process both timer queues and set tv/wait_tv according to which has the
+        // timer that will expire soonest:
+        timer_base<Base>::process_timers(clock_type::MONOTONIC, do_wait, tv, wait_tv);
+
+        if (! do_wait) {
+            timer_base<Base>::process_timers(clock_type::SYSTEM, do_wait, tv, wait_tv);
+        }
+        else {
+            timeval mono_tv = tv;
+            timer_base<Base>::process_timers(clock_type::SYSTEM, do_wait, tv, wait_tv);
+            if (mono_tv.tv_sec < tv.tv_sec ||
+                    (mono_tv.tv_sec == tv.tv_sec && mono_tv.tv_usec < tv.tv_usec)) {
+                tv = mono_tv;
             }
-            
+        }
+    }
+
+    inline void process_monotonic_timers(bool &do_wait, timespec &ts, timespec *&wait_ts)
+    {
+        // We need to process both timer queues and set tv/wait_tv according to which has the
+        // timer that will expire soonest:
+        timer_base<Base>::process_timers(clock_type::MONOTONIC, do_wait, ts, wait_ts);
+
+        if (! do_wait) {
+            timer_base<Base>::process_timers(clock_type::SYSTEM, do_wait, ts, wait_ts);
+        }
+        else {
+            timespec mono_ts = ts;
+            timer_base<Base>::process_timers(clock_type::SYSTEM, do_wait, ts, wait_ts);
+            ts = std::min(time_val(ts), time_val(mono_ts));
+        }
+    }
+
+    // Process monotonic timers based on the current clock time. However, we shadow the
+    // function and provide an implementation which also processes the system clock timer queue.
+    inline void process_monotonic_timers()
+    {
+        auto &mono_timer_queue = this->queue_for_clock(clock_type::MONOTONIC);
+        if (! mono_timer_queue.empty()) {
+            struct timespec curtime_mono;
+            timer_base<Base>::get_time(curtime_mono, clock_type::MONOTONIC, true);
+            timer_base<Base>::process_timer_queue(mono_timer_queue, curtime_mono);
+        }
+
+        auto &sys_timer_queue = this->queue_for_clock(clock_type::SYSTEM);
+        if (! sys_timer_queue.empty()) {
+            struct timespec curtime_sys;
+            timer_base<Base>::get_time(curtime_sys, clock_type::SYSTEM, true);
+            timer_base<Base>::process_timer_queue(sys_timer_queue, curtime_sys);
+        }
+    }
+
+#endif
+
+    void process_timers()
+    {
+        auto &timer_queue = this->queue_for_clock(clock_type::SYSTEM);
+        if (! timer_queue.empty()) {
+            struct timespec curtime;
+            timer_base<Base>::get_time(curtime, clock_type::SYSTEM, true);
+            timer_base<Base>::process_timer_queue(timer_queue, curtime);
+        }
+        
 #ifdef CLOCK_MONOTONIC
+        if (provide_mono_timer) {
             auto &mono_timer_queue = this->queue_for_clock(clock_type::MONOTONIC);
             if (! mono_timer_queue.empty()) {
                 struct timespec curtime_mono;
                 timer_base<Base>::get_time(curtime_mono, clock_type::MONOTONIC, true);
                 timer_base<Base>::process_timer_queue(mono_timer_queue, curtime_mono);
             }
+        }
 #endif
 
+        if (provide_mono_timer) {
             // arm alarm with timeout from head of queue
             set_timer_from_queue();
-            return false; // don't disable signal watch
-        }
-        else {
-            return Base::receive_signal(loop_mech, siginfo, userdata);
         }
     }
-        
+
     public:
 
     class traits_t : public Base::traits_t
@@ -128,11 +200,13 @@ template <class Base> class itimer_events : public timer_base<Base>
 
     template <typename T> void init(T *loop_mech)
     {
-        sigset_t sigmask;
-        this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
-        sigaddset(&sigmask, SIGALRM);
-        this->sigmaskf(SIG_SETMASK, &sigmask, nullptr);
-        loop_mech->add_signal_watch(SIGALRM, nullptr);
+        if (provide_mono_timer) {
+            sigset_t sigmask;
+            this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
+            sigaddset(&sigmask, SIGALRM);
+            this->sigmaskf(SIG_SETMASK, &sigmask, nullptr);
+            loop_mech->add_signal_watch(SIGALRM, this);
+        }
         Base::init(loop_mech);
     }
     
@@ -152,16 +226,22 @@ template <class Base> class itimer_events : public timer_base<Base>
         ts.expiry_count = 0;
         ts.enabled = enable;
 
+        bool do_set_timer;
         if (timer_queue.is_queued(timer_id)) {
             // Already queued; alter timeout
-            if (timer_queue.set_priority(timer_id, timeout)) {
-                set_timer_from_queue();
-            }
+            do_set_timer = timer_queue.set_priority(timer_id, timeout);
         }
         else {
-            if (timer_queue.insert(timer_id, timeout)) {
+            do_set_timer = timer_queue.insert(timer_id, timeout);
+        }
+
+        if (do_set_timer) {
+            if (provide_mono_timer) {
                 set_timer_from_queue();
             }
+            else {
+                this->interrupt_wait();
+            }
         }
     }
 
index 844a5f5ac3045e4261197573405e056888734daf..280c60cb71f8b2ab2464cf92bee0b2d69d8eb491 100644 (file)
@@ -70,72 +70,7 @@ class macos_kqueue_traits : public signal_traits
     constexpr static bool supports_non_oneshot_fd = false;
 };
 
-#if _POSIX_REALTIME_SIGNALS > 0
-static inline void prepare_signal(int signo) { }
-static inline void unprep_signal(int signo) { }
-
-inline bool get_siginfo(int signo, siginfo_t *siginfo)
-{
-    struct timespec timeout;
-    timeout.tv_sec = 0;
-    timeout.tv_nsec = 0;
-
-    sigset_t mask;
-    sigemptyset(&mask);
-    sigaddset(&mask, signo);
-    return (sigtimedwait(&mask, siginfo, &timeout) != -1);
-}
-#else
-
-// If we have no sigtimedwait implementation, we have to retrieve signal data by establishing a
-// signal handler.
-
-// We need to declare and define a non-static data variable, "siginfo_p", in this header, without
-// violating the "one definition rule". The only way to do that is via a template, even though we
-// don't otherwise need a template here:
-template <typename T = decltype(nullptr)> class sig_capture_templ
-{
-    public:
-    static siginfo_t * siginfo_p;
-
-    static void signalHandler(int signo, siginfo_t *siginfo, void *v)
-    {
-        *siginfo_p = *siginfo;
-    }
-};
-template <typename T> siginfo_t * sig_capture_templ<T>::siginfo_p = nullptr;
-
-using sig_capture = sig_capture_templ<>;
-
-inline void prepare_signal(int signo)
-{
-    struct sigaction the_action;
-    the_action.sa_sigaction = sig_capture::signalHandler;
-    the_action.sa_flags = SA_SIGINFO;
-    sigfillset(&the_action.sa_mask);
-
-    sigaction(signo, &the_action, nullptr);
-}
-
-inline void unprep_signal(int signo)
-{
-    signal(signo, SIG_DFL);
-}
-
-inline bool get_siginfo(int signo, siginfo_t *siginfo)
-{
-    sig_capture::siginfo_p = siginfo;
-
-    sigset_t mask;
-    sigfillset(&mask);
-    sigdelset(&mask, signo);
-    sigsuspend(&mask);
-    return true;
-}
-
-#endif
-
-template <class Base> class macos_kqueue_loop : public signal_events<Base,true>
+template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
 {
     int kqfd; // kqueue fd
 
@@ -181,7 +116,8 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base,true>
             }
         }
 
-        // Now we disable all received events, to simulate EV_DISPATCH:
+        // Now we disable all received events, to simulate EV_DISPATCH. Note that EV_DISPATH is
+        // actually available on MacOS, but we can't use it due to the signal processing bug.
         kevent(kqfd, events, r, nullptr, 0, nullptr);
     }
 
@@ -402,35 +338,52 @@ template <class Base> class macos_kqueue_loop : public signal_events<Base,true>
         struct kevent events[16];
         struct timespec ts;
 
+        // wait_ts remains null for an infinite wait; it is later set to either a 0 timeout
+        // if do_wait is false (or if we otherwise won't wait due to events being detected
+        // early) or is set to an appropriate timeout for the next timer's timeout.
+        struct timespec *wait_ts = nullptr;
+
         Base::lock.lock();
-        sigset_t sigmask = this->get_active_sigmask();
-        Base::lock.unlock();
 
-        volatile bool was_signalled = false;
+        // Check whether any timers are pending, and what the next timeout is.
+        this->process_monotonic_timers(do_wait, ts, wait_ts);
+
+        const sigset_t &active_sigmask = this->get_active_sigmask();
+        Base::lock.unlock();
 
         // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
         // received during polling, it will longjmp back to here:
         if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
-            this->process_signal(sigmask);
-            was_signalled = true;
-        }
-
-        if (was_signalled) {
+            this->process_signal();
             do_wait = false;
         }
 
-        ts.tv_sec = 0;
-        ts.tv_nsec = 0;
+        if (! do_wait) {
+            ts.tv_sec = 0;
+            ts.tv_nsec = 0;
+            wait_ts = &ts;
+        }
 
         std::atomic_signal_fence(std::memory_order_release);
-        this->sigmaskf(SIG_UNBLOCK, &sigmask, nullptr);
-        int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
-        this->sigmaskf(SIG_BLOCK, &sigmask, nullptr);
+
+        // Run kevent with signals unmasked:
+        this->sigmaskf(SIG_UNBLOCK, &active_sigmask, nullptr);
+        int r = kevent(kqfd, nullptr, 0, events, 16, wait_ts);
+        this->sigmaskf(SIG_BLOCK, &active_sigmask, nullptr);
+
         if (r == -1 || r == 0) {
             // signal or no events
+            if (r == 0 && do_wait) {
+                // timeout:
+                Base::lock.lock();
+                this->process_monotonic_timers();
+                Base::lock.unlock();
+            }
             return;
         }
 
+        ts = time_val(0, 0);
+
         do {
             process_events(events, r);
             r = kevent(kqfd, nullptr, 0, events, 16, &ts);
index b2f03b4741ae74f901cdcabc90f7e7b767c653b6..0b1c0e5025d11db2e3eec1f8867a29c71f6f823c 100644 (file)
@@ -557,14 +557,37 @@ template <class Base> class kqueue_loop : public Base
         struct kevent events[16];
         struct timespec ts;
 
-        ts.tv_sec = 0;
-        ts.tv_nsec = 0;
-        int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
+        // wait_ts remains null for an infinite wait; it is later set to either a 0 timeout
+        // if do_wait is false (or if we otherwise won't wait due to events being detected
+        // early) or is set to an appropriate timeout for the next timer's timeout.
+        struct timespec *wait_ts = nullptr;
+
+        // Check whether any timers are pending, and what the next timeout is.
+        Base::lock.lock();
+        this->process_monotonic_timers(do_wait, ts, wait_ts);
+        Base::lock.unlock();
+
+        if (! do_wait) {
+            ts.tv_sec = 0;
+            ts.tv_nsec = 0;
+            wait_ts = &ts;
+        }
+
+        int r = kevent(kqfd, nullptr, 0, events, 16, wait_ts);
         if (r == -1 || r == 0) {
             // signal or no events
+            if (r == 0 && do_wait) {
+                // timeout:
+                Base::lock.lock();
+                this->process_monotonic_timers();
+                Base::lock.unlock();
+            }
             return;
         }
         
+        ts.tv_sec = 0;
+        ts.tv_nsec = 0;
+
         do {
             process_events(events, r);
             r = kevent(kqfd, nullptr, 0, events, 16, &ts);
index 2715c3d6796b3a0a6dc4fe237a72a212d8eb213c..17816c3b7fc43e0520dcdee440d2795e59e0a8e5 100644 (file)
@@ -13,7 +13,8 @@ namespace dasynq {
 // Timer implementation based on POSIX create_timer et al.
 // May require linking with -lrt
 
-template <class Base> class posix_timer_events : public timer_base<Base>
+template <class Base, bool provide_mono_timer = true>
+class posix_timer_events : public timer_base<Base>
 {
     private:
     timer_t real_timer;
@@ -56,7 +57,7 @@ template <class Base> class posix_timer_events : public timer_base<Base>
                 set_timer_from_queue(real_timer, real_timer_queue);
             }
 
-            if (! mono_timer_queue.empty()) {
+            if (! mono_timer_queue.empty() && provide_mono_timer) {
                 this->get_time(curtime, clock_type::MONOTONIC, true);
                 this->process_timer_queue(mono_timer_queue, curtime);
                 set_timer_from_queue(mono_timer, mono_timer_queue);
@@ -94,7 +95,7 @@ template <class Base> class posix_timer_events : public timer_base<Base>
         this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
         sigaddset(&sigmask, SIGALRM);
         this->sigmaskf(SIG_SETMASK, &sigmask, nullptr);
-        loop_mech->add_signal_watch(SIGALRM, nullptr);
+        loop_mech->add_signal_watch(SIGALRM, this);
 
         struct sigevent timer_sigevent;
         timer_sigevent.sigev_notify = SIGEV_SIGNAL;
@@ -103,7 +104,7 @@ template <class Base> class posix_timer_events : public timer_base<Base>
 
         // Create the timers; throw std::system_error if we can't.
         if (timer_create(CLOCK_REALTIME, &timer_sigevent, &real_timer) == 0) {
-            if (timer_create(CLOCK_MONOTONIC, &timer_sigevent, &mono_timer) != 0) {
+            if (provide_mono_timer && timer_create(CLOCK_MONOTONIC, &timer_sigevent, &mono_timer) != 0) {
                 timer_delete(real_timer);
                 throw std::system_error(errno, std::system_category());
             }
@@ -133,12 +134,16 @@ template <class Base> class posix_timer_events : public timer_base<Base>
         if (timer_queue.is_queued(timer_id)) {
             // Already queued; alter timeout
             if (timer_queue.set_priority(timer_id, timeout)) {
-                set_timer_from_queue(timer, timer_queue);
+                if (clock != clock_type::MONOTONIC || provide_mono_timer) {
+                    set_timer_from_queue(timer, timer_queue);
+                }
             }
         }
         else {
             if (timer_queue.insert(timer_id, timeout)) {
-                set_timer_from_queue(timer, timer_queue);
+                if (clock != clock_type::MONOTONIC || provide_mono_timer) {
+                    set_timer_from_queue(timer, timer_queue);
+                }
             }
         }
     }
@@ -147,10 +152,8 @@ template <class Base> class posix_timer_events : public timer_base<Base>
     void set_timer_rel(timer_handle_t &timer_id, const timespec &timeout, const timespec &interval,
             bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
     {
-        // TODO consider caching current time somehow; need to decide then when to update cached value.
         struct timespec curtime;
-        int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME;
-        clock_gettime(posix_clock_id, &curtime);
+        this->get_time(curtime, clock, false);
         curtime.tv_sec += timeout.tv_sec;
         curtime.tv_nsec += timeout.tv_nsec;
         if (curtime.tv_nsec > 1000000000) {
@@ -174,7 +177,7 @@ template <class Base> class posix_timer_events : public timer_base<Base>
         if (timer_queue.is_queued(timer_id)) {
             bool was_first = (&timer_queue.get_root()) == &timer_id;
             timer_queue.remove(timer_id);
-            if (was_first) {
+            if (was_first && (clock != clock_type::MONOTONIC || provide_mono_timer)) {
                 set_timer_from_queue(timer, timer_queue);
             }
         }
@@ -182,7 +185,9 @@ template <class Base> class posix_timer_events : public timer_base<Base>
 
     ~posix_timer_events()
     {
-        timer_delete(mono_timer);
+        if (provide_mono_timer) {
+            timer_delete(mono_timer);
+        }
         timer_delete(real_timer);
     }
 };
diff --git a/src/dasynq/dasynq-pselect.h b/src/dasynq/dasynq-pselect.h
new file mode 100644 (file)
index 0000000..f4a5157
--- /dev/null
@@ -0,0 +1,280 @@
+#include "dasynq-select.h"
+#include "dasynq-signal.h"
+
+namespace dasynq {
+
+template <class Base> class pselect_events : public signal_events<Base, false>
+{
+    fd_set read_set;
+    fd_set write_set;
+    //fd_set error_set;  // logical OR of both the above
+    int max_fd = 0; // highest fd in any of the sets
+
+    // userdata pointers in read and write respectively, for each fd:
+    std::vector<void *> rd_udata;
+    std::vector<void *> wr_udata;
+
+    // Base contains:
+    //   lock - a lock that can be used to protect internal structure.
+    //          receive*() methods will be called with lock held.
+    //   receive_signal(sigdata_t &, user *) noexcept
+    //   receive_fd_event(fd_r, user *, int flags) noexcept
+
+    using fd_r = typename select_traits::fd_r;
+
+    void process_events(fd_set *read_set_p, fd_set *write_set_p, fd_set *error_set_p)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+        // Note: if error is set, report read-ready.
+
+        for (int i = 0; i <= max_fd; i++) {
+            if (FD_ISSET(i, read_set_p) || FD_ISSET(i, error_set_p)) {
+                if (FD_ISSET(i, &read_set) && rd_udata[i] != nullptr) {
+                    // report read
+                    auto r = Base::receive_fd_event(*this, fd_r(i), rd_udata[i], IN_EVENTS);
+                    if (std::get<0>(r) == 0) {
+                        FD_CLR(i, &read_set);
+                    }
+                }
+            }
+        }
+
+        for (int i = 0; i <= max_fd; i++) {
+            if (FD_ISSET(i, write_set_p)) {
+                if (FD_ISSET(i, &write_set) && wr_udata[i] != nullptr) {
+                    // report write
+                    auto r = Base::receive_fd_event(*this, fd_r(i), wr_udata[i], OUT_EVENTS);
+                    if (std::get<0>(r) == 0) {
+                        FD_CLR(i, &write_set);
+                    }
+                }
+            }
+        }
+    }
+
+    public:
+
+    /**
+     * pselect_events constructor.
+     *
+     * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
+     */
+    pselect_events()
+    {
+        FD_ZERO(&read_set);
+        FD_ZERO(&write_set);
+        Base::init(this);
+    }
+
+    ~pselect_events()
+    {
+    }
+
+    //        fd:  file descriptor to watch
+    //  userdata:  data to associate with descriptor
+    //     flags:  IN_EVENTS | OUT_EVENTS | ONE_SHOT
+    //             (only one of IN_EVENTS/OUT_EVENTS can be specified)
+    // soft_fail:  true if unsupported file descriptors should fail by returning false instead
+    //             of throwing an exception
+    // returns: true on success; false if file descriptor type isn't supported and emulate == true
+    // throws:  std::system_error or std::bad_alloc on failure
+    bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false)
+    {
+        if (fd >= FD_SETSIZE) {
+            throw std::system_error(EMFILE, std::system_category());
+        }
+
+        if (flags & IN_EVENTS) {
+            FD_SET(fd, &read_set);
+            if (size_t(fd) >= rd_udata.size()) {
+                rd_udata.resize(fd + 1);
+            }
+            rd_udata[fd] = userdata;
+        }
+        else {
+            FD_SET(fd, &write_set);
+            if (size_t(fd) >= wr_udata.size()) {
+                wr_udata.resize(fd + 1);
+            }
+            wr_udata[fd] = userdata;
+        }
+
+        max_fd = std::max(fd, max_fd);
+
+        return true;
+    }
+
+    // returns: 0 on success
+    //          IN_EVENTS  if in watch requires emulation
+    //          OUT_EVENTS if out watch requires emulation
+    int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false)
+    {
+        if (fd >= FD_SETSIZE) {
+            throw std::system_error(EMFILE, std::system_category());
+        }
+
+        if (flags & IN_EVENTS) {
+            FD_SET(fd, &read_set);
+            if (size_t(fd) >= rd_udata.size()) {
+                rd_udata.resize(fd + 1);
+            }
+            rd_udata[fd] = userdata;
+        }
+        if (flags & OUT_EVENTS) {
+            FD_SET(fd, &write_set);
+            if (size_t(fd) >= wr_udata.size()) {
+                wr_udata.resize(fd + 1);
+            }
+            wr_udata[fd] = userdata;
+        }
+
+        max_fd = std::max(fd, max_fd);
+
+        return 0;
+    }
+
+    // flags specifies which watch to remove; ignored if the loop doesn't support
+    // separate read/write watches.
+    void remove_fd_watch_nolock(int fd, int flags)
+    {
+        if (flags & IN_EVENTS) {
+            FD_CLR(fd, &read_set);
+            rd_udata[fd] = nullptr;
+        }
+        if (flags & OUT_EVENTS) {
+            FD_CLR(fd, &write_set);
+            wr_udata[fd] = nullptr;
+        }
+    }
+
+    void remove_fd_watch(int fd, int flags)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        remove_fd_watch_nolock(fd, flags);
+    }
+
+    void remove_bidi_fd_watch(int fd) noexcept
+    {
+        FD_CLR(fd, &read_set);
+        FD_CLR(fd, &write_set);
+    }
+
+    void enable_fd_watch_nolock(int fd, void *userdata, int flags)
+    {
+        if (flags & IN_EVENTS) {
+            FD_SET(fd, &read_set);
+        }
+        else {
+            FD_SET(fd, &write_set);
+        }
+    }
+
+    void enable_fd_watch(int fd, void *userdata, int flags)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        enable_fd_watch_nolock(fd, userdata, flags);
+    }
+
+    void disable_fd_watch_nolock(int fd, int flags)
+    {
+        if (flags & IN_EVENTS) {
+            FD_CLR(fd, &read_set);
+        }
+        else {
+            FD_CLR(fd, &write_set);
+        }
+    }
+
+    void disable_fd_watch(int fd, int flags)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        disable_fd_watch_nolock(fd, flags);
+    }
+
+    // If events are pending, process an unspecified number of them.
+    // If no events are pending, wait until one event is received and
+    // process this event (and possibly any other events received
+    // simultaneously).
+    // If processing an event removes a watch, there is a possibility
+    // that the watched event will still be reported (if it has
+    // occurred) before pull_events() returns.
+    //
+    //  do_wait - if false, returns immediately if no events are
+    //            pending.
+    void pull_events(bool do_wait) noexcept
+    {
+        struct timespec ts;
+        struct timespec *wait_ts = nullptr;
+
+        Base::lock.lock();
+
+        // Check whether any timers are pending, and what the next timeout is.
+        this->process_monotonic_timers(do_wait, ts, wait_ts);
+
+        fd_set read_set_c;
+        fd_set write_set_c;
+        fd_set err_set;
+
+        read_set_c = read_set;
+        write_set_c = write_set;
+        err_set = read_set;
+
+        const sigset_t &active_sigmask = this->get_active_sigmask();
+
+        sigset_t sigmask;
+        this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
+
+        // This is horrible, but hopefully will be optimised well. POSIX gives no way to combine signal
+        // sets other than this.
+        for (int i = 1; i < NSIG; i++) {
+            if (! sigismember(&active_sigmask, i)) {
+                sigdelset(&sigmask, i);
+            }
+        }
+        int nfds = max_fd + 1;
+        Base::lock.unlock();
+
+        // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
+        // received during polling, it will longjmp back to here:
+        if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
+            this->process_signal(sigmask);
+            do_wait = false;
+        }
+
+        if (! do_wait) {
+            ts.tv_sec = 0;
+            ts.tv_nsec = 0;
+            wait_ts = &ts;
+        }
+
+        std::atomic_signal_fence(std::memory_order_release);
+
+        int r = pselect(nfds, &read_set_c, &write_set_c, &err_set, wait_ts, &sigmask);
+
+        if (r == -1 || r == 0) {
+            // signal or no events
+            if (r == 0) {
+                if (! do_wait) {
+                    // At least on Mac OS, pselect doesn't seem to give us a pending signal
+                    // if we have a zero timeout. Force detection using sigmask:
+                    sigset_t origmask;
+                    this->sigmaskf(SIG_SETMASK, &sigmask, &origmask);
+                    this->sigmaskf(SIG_SETMASK, &origmask, nullptr);
+                }
+
+                if (r == 0 && do_wait) {
+                    // timeout:
+                    Base::lock.lock();
+                    this->process_monotonic_timers();
+                    Base::lock.unlock();
+                }
+            }
+            return;
+        }
+
+        process_events(&read_set_c, &write_set_c, &err_set);
+    }
+};
+
+} // end namespace
index 29a1ae35b627ae17ea6af2734beac1f475825e9a..4c96e2cc86876fba39e5a2f9cadaabe8dfb2e120 100644 (file)
@@ -1,3 +1,6 @@
+#ifndef DAYSNQ_SELECT_INCLUDED
+#define DASYNQ_SELECT_INDCLUDED 1
+
 #include <system_error>
 #include <vector>
 #include <atomic>
@@ -60,7 +63,7 @@ class select_traits : public signal_traits
     constexpr static bool supports_non_oneshot_fd = false;
 };
 
-template <class Base> class select_events : public signal_events<Base>
+template <class Base> class select_events : public signal_events<Base, true>
 {
     fd_set read_set;
     fd_set write_set;
@@ -261,53 +264,56 @@ template <class Base> class select_events : public signal_events<Base>
     //            pending.
     void pull_events(bool do_wait) noexcept
     {
-        //using namespace dprivate::select_mech;
+        struct timeval ts;
+        struct timeval *wait_ts = nullptr;
+
+        Base::lock.lock();
 
-        struct timespec ts;
-        ts.tv_sec = 0;
-        ts.tv_nsec = 0;
+        // Check whether any timers are pending, and what the next timeout is.
+        // Check whether any timers are pending, and what the next timeout is.
+        this->process_monotonic_timers(do_wait, ts, wait_ts);
 
         fd_set read_set_c;
         fd_set write_set_c;
         fd_set err_set;
 
-        Base::lock.lock();
         read_set_c = read_set;
         write_set_c = write_set;
         err_set = read_set;
 
         const sigset_t &active_sigmask = this->get_active_sigmask();
 
-        sigset_t sigmask;
-        this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
-        // This is horrible, but hopefully will be optimised well. POSIX gives no way to combine signal
-        // sets other than this.
-        for (int i = 1; i < NSIG; i++) {
-            if (! sigismember(&active_sigmask, i)) {
-                sigdelset(&sigmask, i);
-            }
-        }
         int nfds = max_fd + 1;
         Base::lock.unlock();
 
-        volatile bool was_signalled = false;
-
         // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
         // received during polling, it will longjmp back to here:
         if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
-            this->process_signal(sigmask);
-            was_signalled = true;
+            this->process_signal();
+            do_wait = false;
         }
 
-        if (was_signalled) {
-            do_wait = false;
+        if (! do_wait) {
+            ts.tv_sec = 0;
+            ts.tv_usec = 0;
+            wait_ts = &ts;
         }
 
         std::atomic_signal_fence(std::memory_order_release);
 
-        int r = pselect(nfds, &read_set_c, &write_set_c, &err_set, do_wait ? nullptr : &ts, &sigmask);
+        this->sigmaskf(SIG_UNBLOCK, &active_sigmask, nullptr);
+        int r = select(nfds, &read_set_c, &write_set_c, &err_set, wait_ts);
+        this->sigmaskf(SIG_BLOCK, &active_sigmask, nullptr);
+
         if (r == -1 || r == 0) {
             // signal or no events
+            if (r == 0 && do_wait) {
+                // timeout:
+                Base::lock.lock();
+                this->process_monotonic_timers();
+                Base::lock.unlock();
+            }
+
             return;
         }
 
@@ -316,3 +322,5 @@ template <class Base> class select_events : public signal_events<Base>
 };
 
 } // end namespace
+
+#endif
index dc9a8404ef3be0ccf71fadd8328e9d1b87463697..7a9ddd495414428c15ba5734cd8badca74402c0d 100644 (file)
@@ -1,3 +1,6 @@
+#ifndef DASYNQ_SIGNAL_INCLUDED
+#define DASYNQ_SIGNAL_INCLUDED 1
+
 #include <atomic>
 
 #include <signal.h>
@@ -124,6 +127,9 @@ template <class Base, bool mask_enables = false> class signal_events : public Ba
         }
     }
 
+    // Get the active signal mask - identifying the set of signals which have an enabled watcher.
+    // if mask_enables is true, the returned set contains the active signals; otherwise, it
+    // contains all inactive signals.
     const sigset_t &get_active_sigmask()
     {
         return active_sigmask;
@@ -134,7 +140,8 @@ template <class Base, bool mask_enables = false> class signal_events : public Ba
         return dprivate::signal_mech::get_sigreceive_jmpbuf();
     }
 
-    void process_signal(sigset_t &sigmask)
+    // process a received signal
+    void process_signal()
     {
         using namespace dprivate::signal_mech;
         std::atomic_signal_fence(std::memory_order_acquire);
@@ -146,11 +153,34 @@ template <class Base, bool mask_enables = false> class signal_events : public Ba
         void *udata = sig_userdata[sinfo->si_signo];
         if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) {
             if (mask_enables) {
-                sigdelset(&sigmask, sinfo->si_signo);
                 sigdelset(&active_sigmask, sinfo->si_signo);
             }
             else {
+                sigaddset(&active_sigmask, sinfo->si_signo);
+            }
+        }
+        Base::lock.unlock();
+    }
+
+    // process a received signal, and update sigmask - which should reflect the inverse of the
+    // active signal mask.
+    void process_signal(sigset_t &sigmask)
+    {
+        using namespace dprivate::signal_mech;
+        std::atomic_signal_fence(std::memory_order_acquire);
+        auto * sinfo = get_siginfo();
+        sigdata_t sigdata;
+        sigdata.info = *sinfo;
+
+        Base::lock.lock();
+        void *udata = sig_userdata[sinfo->si_signo];
+        if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) {
+            if (mask_enables) {
                 sigaddset(&sigmask, sinfo->si_signo);
+                sigdelset(&active_sigmask, sinfo->si_signo);
+            }
+            else {
+                sigdelset(&sigmask, sinfo->si_signo);
                 sigaddset(&active_sigmask, sinfo->si_signo);
             }
         }
@@ -213,3 +243,5 @@ template <class Base, bool mask_enables = false> class signal_events : public Ba
 };
 
 }
+
+#endif
index 96cc5185fd5f5a83823284c44be52a895986dfe6..43d0007735a2f539236311a30167d21e02de17b5 100644 (file)
@@ -4,6 +4,8 @@
 #include <utility>
 #include <mutex>
 
+#include <time.h>
+
 #include "dasynq-daryheap.h"
 
 namespace dasynq {
@@ -302,6 +304,8 @@ template <typename Base> class timer_base : public Base
         return timer_queue.empty() && mono_timer_queue.empty();
     }
 #else
+    // If there is no monotonic clock, map both clock_type::MONOTONIC and clock_type::SYSTEM to a
+    // single clock (based on gettimeofday).
     protected:
     inline timer_queue_t &queue_for_clock(clock_type clock)
     {
@@ -315,9 +319,11 @@ template <typename Base> class timer_base : public Base
 #endif
 
     // For the specified timer queue, issue expirations for all timers set to expire on or before the given
-    // time (curtime). The timer queue must not be empty.
+    // time (curtime).
     void process_timer_queue(timer_queue_t &queue, const struct timespec &curtime) noexcept
     {
+        if (queue.empty()) return;
+
         // Peek timer queue; calculate difference between current time and timeout
         const time_val * timeout = &queue.get_root_priority();
         time_val curtime_tv = curtime;
@@ -366,6 +372,79 @@ template <typename Base> class timer_base : public Base
         }
     }
 
+    // Process timers based on the current clock time. If any timers have expired,
+    // set do_wait to false; otherwise, if any timers are pending, set ts to the delay before
+    // the next timer expires and set wait_ts to &ts.
+    // (If no timers are active, none of the output parameters are set).
+    inline void process_timers(clock_type clock, bool &do_wait, timespec &ts, timespec *&wait_ts)
+    {
+        timespec now;
+        auto &timer_q = this->queue_for_clock(clock);
+        this->get_time(now, clock, true);
+        if (! timer_q.empty()) {
+            const time_val &timeout = timer_q.get_root_priority();
+            if (timeout <= now) {
+                this->process_timer_queue(timer_q, now);
+                do_wait = false; // don't wait, we have events already
+            }
+            else if (do_wait) {
+                ts = (timeout - now);
+                wait_ts = &ts;
+            }
+        }
+    }
+
+    // Process timers based on the current clock time. If any timers have expired,
+    // set do_wait to false; otherwise, if any timers are pending, set tv to the delay before
+    // the next timer expires and set wait_tv to &tv.
+    // (If no timers are active, none of the output parameters are set).
+    inline void process_timers(clock_type clock, bool &do_wait, timeval &tv, timeval *&wait_tv)
+    {
+        timespec now;
+        auto &timer_q = this->queue_for_clock(clock);
+        this->get_time(now, clock, true);
+        if (! timer_q.empty()) {
+            const time_val &timeout = timer_q.get_root_priority();
+            if (timeout <= now) {
+                this->process_timer_queue(timer_q, now);
+                do_wait = false; // don't wait, we have events already
+            }
+            else if (do_wait) {
+                time_val delay = (timeout - now);
+                tv.tv_sec = delay.seconds();
+                tv.tv_usec = (delay.nseconds() + 999) / 1000;
+                wait_tv = &tv;
+            }
+        }
+    }
+
+    // Process monotonic timers based on the current clock time.
+    inline void process_monotonic_timers()
+    {
+        timespec now;
+        auto &timer_q = this->queue_for_clock(clock_type::MONOTONIC);
+        this->get_time(now, clock_type::MONOTONIC, true);
+        process_timer_queue(timer_q, now);
+    }
+
+    // Process monotonic timers based on the current clock time. If any timers have expired,
+    // set do_wait to false; otherwise, if any timers are pending, set ts to the delay before
+    // the next timer expires and set wait_ts to &ts.
+    // (If no timers are active, none of the output parameters are set).
+    inline void process_monotonic_timers(bool &do_wait, timespec &ts, timespec *&wait_ts)
+    {
+        process_timers(clock_type::MONOTONIC, do_wait, ts, wait_ts);
+    }
+
+    // Process monotonic timers based on the current clock time. If any timers have expired,
+    // set do_wait to false; otherwise, if any timers are pending, set ts to the delay before
+    // the next timer expires and set wait_ts to &ts.
+    // (If no timers are active, none of the output parameters are set).
+    inline void process_monotonic_timers(bool &do_wait, timeval &tv, timeval *&wait_tv)
+    {
+        process_timers(clock_type::MONOTONIC, do_wait, tv, wait_tv);
+    }
+
     public:
 
     void get_time(time_val &tv, clock_type clock, bool force_update) noexcept
index b8317406f0246927d2d308d1df07534d7d66b17d..97beecdd430fd015c52235a9ef744562f643f91b 100644 (file)
 #if _POSIX_TIMERS > 0
 #include "dasynq-posixtimer.h"
 namespace dasynq {
-    template <typename T> using timer_events = posix_timer_events<T>;
+    template <typename T, bool provide_mono_timer = true> using timer_events = posix_timer_events<T, provide_mono_timer>;
 }
 #else
 #include "dasynq-itimer.h"
 namespace dasynq {
-    template <typename T> using timer_events = itimer_events<T>;
+    template <typename T, bool provide_mono_timer = true> using timer_events = itimer_events<T, provide_mono_timer>;
 }
 #endif
 #endif
@@ -99,14 +99,14 @@ namespace dasynq {
 #include "dasynq-kqueue-macos.h"
 #include "dasynq-childproc.h"
 namespace dasynq {
-    template <typename T> using loop_t = macos_kqueue_loop<interrupt_channel<timer_events<child_proc_events<T>>>>;
+    template <typename T> using loop_t = macos_kqueue_loop<timer_events<child_proc_events<interrupt_channel<T>>, false>>;
     using loop_traits_t = macos_kqueue_traits;
 }
 #else
 #include "dasynq-kqueue.h"
 #include "dasynq-childproc.h"
 namespace dasynq {
-    template <typename T> using loop_t = kqueue_loop<interrupt_channel<timer_events<child_proc_events<T>>>>;
+    template <typename T> using loop_t = kqueue_loop<timer_events<child_proc_events<interrupt_channel<T>>, false>>;
     using loop_traits_t = kqueue_traits;
 }
 #endif
@@ -119,24 +119,21 @@ namespace dasynq {
     using loop_traits_t = epoll_traits;
 }
 #else
-#include "dasynq-select.h"
-#if _POSIX_TIMERS > 0
-#include "dasynq-posixtimer.h"
+#include "dasynq-childproc.h"
+#if DASYNQ_HAVE_PSELECT
+#include "dasynq-pselect.h"
 namespace dasynq {
-    template <typename T> using timer_events = posix_timer_events<T>;
+    template <typename T> using loop_t = pselect_events<timer_events<interrupt_channel<child_proc_events<T>>, false>>;
+    using loop_traits_t = select_traits;
 }
 #else
-#include "dasynq-itimer.h"
-namespace dasynq {
-    template <typename T> using timer_events = itimer_events<T>;
-}
-#endif
-#include "dasynq-childproc.h"
+#include "dasynq-select.h"
 namespace dasynq {
-    template <typename T> using loop_t = select_events<interrupt_channel<timer_events<child_proc_events<T>>>>;
+    template <typename T> using loop_t = select_events<timer_events<interrupt_channel<child_proc_events<T>>, false>>;
     using loop_traits_t = select_traits;
 }
 #endif
+#endif
 
 #include <atomic>
 #include <condition_variable>
@@ -2171,7 +2168,7 @@ class timer : private base_timer_watcher
 
     template <typename T>
     static timer<EventLoop> *add_timer(EventLoop &eloop, clock_type clock, bool relative,
-            struct timespec &timeout, struct timespec &interval, T watch_hndlr)
+            const timespec &timeout, const timespec &interval, T watch_hndlr)
     {
         class lambda_timer : public timer_impl<event_loop_t, lambda_timer>
         {