From 7774c56f4211db2c48d8dca118fd58fdfd75dd91 Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Sat, 3 Feb 2018 19:20:16 +0000 Subject: [PATCH] Update bundled Dasynq (to 1.1.0). --- src/dasynq/dasynq-childproc.h | 4 +- src/dasynq/dasynq-epoll.h | 17 +- src/dasynq/dasynq-interrupt.h | 13 +- src/dasynq/dasynq-kqueue.h | 25 +- src/dasynq/dasynq-posixtimer.h | 9 +- src/dasynq/dasynq-select.h | 448 +++++++++++++++++++++++++++++++++ src/dasynq/dasynq-svec.h | 57 +++-- src/dasynq/dasynq-timerfd.h | 10 +- src/dasynq/dasynq.h | 119 ++++++--- 9 files changed, 622 insertions(+), 80 deletions(-) create mode 100644 src/dasynq/dasynq-select.h diff --git a/src/dasynq/dasynq-childproc.h b/src/dasynq/dasynq-childproc.h index f3f5e7f..c2da99c 100644 --- a/src/dasynq/dasynq-childproc.h +++ b/src/dasynq/dasynq-childproc.h @@ -195,7 +195,9 @@ template class child_proc_events : public Base sigemptyset(&chld_action.sa_mask); chld_action.sa_flags = 0; sigaction(SIGCHLD, &chld_action, nullptr); - loop_mech->add_signal_watch(SIGCHLD, nullptr); + + // Specify a dummy user data value - sigchld_handler + loop_mech->add_signal_watch(SIGCHLD, (void *) dprivate::sigchld_handler); Base::init(loop_mech); } }; diff --git a/src/dasynq/dasynq-epoll.h b/src/dasynq/dasynq-epoll.h index 0cc5466..84b0377 100644 --- a/src/dasynq/dasynq-epoll.h +++ b/src/dasynq/dasynq-epoll.h @@ -68,6 +68,9 @@ class epoll_traits // Epoll doesn't return the file descriptor (it can, but it can't return both file // descriptor and user data). int fd; + + public: + fd_s(int fd_p) noexcept : fd(fd_p) { } }; // File descriptor reference (passed to event callback). If the mechanism can return the @@ -75,14 +78,17 @@ class epoll_traits // must be stored in an fd_s instance. class fd_r { public: - int getFd(fd_s ss) + int get_fd(fd_s ss) { return ss.fd; } }; - const static bool has_bidi_fd_watch = true; - const static bool has_separate_rw_fd_watches = false; + constexpr static bool has_bidi_fd_watch = true; + constexpr static bool has_separate_rw_fd_watches = false; + constexpr static bool interrupt_after_fd_add = false; + constexpr static bool interrupt_after_signal_add = false; + constexpr static bool supports_non_oneshot_fd = true; }; @@ -132,7 +138,10 @@ template class epoll_loop : public Base (events[i].events & EPOLLHUP) && (flags |= IN_EVENTS); (events[i].events & EPOLLOUT) && (flags |= OUT_EVENTS); (events[i].events & EPOLLERR) && (flags |= IN_EVENTS | OUT_EVENTS | ERR_EVENTS); - Base::receive_fd_event(*this, fd_r(), ptr, flags); + auto r = Base::receive_fd_event(*this, fd_r(), ptr, flags); + if (std::get<0>(r) != 0) { + enable_fd_watch_nolock(fd_r().get_fd(std::get<1>(r)), ptr, std::get<0>(r)); + } } } } diff --git a/src/dasynq/dasynq-interrupt.h b/src/dasynq/dasynq-interrupt.h index 07ee12d..55e233d 100644 --- a/src/dasynq/dasynq-interrupt.h +++ b/src/dasynq/dasynq-interrupt.h @@ -61,15 +61,24 @@ template class interrupt_channel : public Base } template - void receive_fd_event(T &loop_mech, typename Base::traits_t::fd_r fd_r_a, void * userdata, int flags) + std::tuple + receive_fd_event(T &loop_mech, typename Base::traits_t::fd_r fd_r_a, void * userdata, int flags) { if (userdata == &pipe_r_fd) { // try to clear the pipe char buf[64]; read(pipe_r_fd, buf, 64); + if (Base::traits_t::supports_non_oneshot_fd) { + // If the loop mechanism actually persists none-oneshot marked watches, we don't need + // to re-enable: + return std::make_tuple(0, typename Base::traits_t::fd_s(pipe_r_fd)); + } + else { + return std::make_tuple(IN_EVENTS, typename Base::traits_t::fd_s(pipe_r_fd)); + } } else { - Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags); + return Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags); } } diff --git a/src/dasynq/dasynq-kqueue.h b/src/dasynq/dasynq-kqueue.h index b475f00..b2f03b4 100644 --- a/src/dasynq/dasynq-kqueue.h +++ b/src/dasynq/dasynq-kqueue.h @@ -1,8 +1,8 @@ #include -#include #include #include #include +#include #include #include @@ -72,6 +72,9 @@ class kqueue_traits // File descriptor optional storage. If the mechanism can return the file descriptor, this // class will be empty, otherwise it can hold a file descriptor. class fd_s { + public: + fd_s(int) { } + DASYNQ_EMPTY_BODY }; @@ -90,8 +93,11 @@ class kqueue_traits } }; - const static bool has_bidi_fd_watch = false; - const static bool has_separate_rw_fd_watches = true; + constexpr static bool has_bidi_fd_watch = false; + constexpr static bool has_separate_rw_fd_watches = true; + constexpr static bool interrupt_after_fd_add = false; + constexpr static bool interrupt_after_signal_add = false; + constexpr static bool supports_non_oneshot_fd = false; }; #if _POSIX_REALTIME_SIGNALS > 0 @@ -213,10 +219,15 @@ template class kqueue_loop : public Base } else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) { int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS; - Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags); - events[i].flags = EV_DISABLE | EV_CLEAR; - // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for - // another connection). + auto r = Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags); + if (std::get<0>(r) == 0) { + // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for + // another connection). + events[i].flags = EV_DISABLE | EV_CLEAR; + } + else { + events[i].flags = EV_ENABLE; + } } else { events[i].flags = EV_DISABLE; diff --git a/src/dasynq/dasynq-posixtimer.h b/src/dasynq/dasynq-posixtimer.h index 3473330..2715c3d 100644 --- a/src/dasynq/dasynq-posixtimer.h +++ b/src/dasynq/dasynq-posixtimer.h @@ -117,11 +117,9 @@ template class posix_timer_events : public timer_base // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0. // enable: specifies whether to enable reporting of timeouts/intervals - void set_timer(timer_handle_t &timer_id, time_val &timeouttv, struct timespec &interval, + void set_timer(timer_handle_t &timer_id, const timespec &timeout, const timespec &interval, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept { - timespec timeout = timeouttv; - std::lock_guard guard(Base::lock); timer_queue_t &timer_queue = this->queue_for_clock(clock); @@ -146,12 +144,9 @@ template class posix_timer_events : public timer_base } // Set timer relative to current time: - void set_timer_rel(timer_handle_t &timer_id, const time_val &timeouttv, const time_val &intervaltv, + void set_timer_rel(timer_handle_t &timer_id, const timespec &timeout, const timespec &interval, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept { - timespec timeout = timeouttv; - timespec interval = intervaltv; - // TODO consider caching current time somehow; need to decide then when to update cached value. struct timespec curtime; int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME; diff --git a/src/dasynq/dasynq-select.h b/src/dasynq/dasynq-select.h new file mode 100644 index 0000000..a10e81a --- /dev/null +++ b/src/dasynq/dasynq-select.h @@ -0,0 +1,448 @@ +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "dasynq-config.h" + +// "pselect"-based event loop mechanism. +// + +namespace dasynq { + +template class select_events; + +class select_traits +{ + public: + + class sigdata_t + { + template friend class select_events; + + siginfo_t info; + + public: + // mandatory: + int get_signo() { return info.si_signo; } + int get_sicode() { return info.si_code; } + pid_t get_sipid() { return info.si_pid; } + uid_t get_siuid() { return info.si_uid; } + void * get_siaddr() { return info.si_addr; } + int get_sistatus() { return info.si_status; } + int get_sival_int() { return info.si_value.sival_int; } + void * get_sival_ptr() { return info.si_value.sival_ptr; } + + // XSI + int get_sierrno() { return info.si_errno; } + + // XSR (streams) OB (obselete) +#if !defined(__OpenBSD__) + // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS + // interface. + int get_siband() { return info.si_band; } +#endif + + void set_signo(int signo) { info.si_signo = signo; } + }; + + class fd_r; + + // File descriptor optional storage. If the mechanism can return the file descriptor, this + // class will be empty, otherwise it can hold a file descriptor. + class fd_s { + public: + fd_s(int fd) noexcept { } + + DASYNQ_EMPTY_BODY + }; + + // File descriptor reference (passed to event callback). If the mechanism can return the + // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor + // must be stored in an fd_s instance. + class fd_r { + int fd; + public: + int getFd(fd_s ss) + { + return fd; + } + fd_r(int nfd) : fd(nfd) + { + } + }; + + constexpr static bool has_bidi_fd_watch = false; + constexpr static bool has_separate_rw_fd_watches = true; + // requires interrupt after adding/enabling an fd: + constexpr static bool interrupt_after_fd_add = true; + constexpr static bool interrupt_after_signal_add = true; + constexpr static bool supports_non_oneshot_fd = false; +}; + +namespace dprivate { +namespace select_mech { + +// We need to declare and define a non-static data variable, "siginfo_p", in this header, without +// violating the "one definition rule". The only way to do that is via a template, even though we +// don't otherwise need a template here: +template class sig_capture_templ +{ + public: + static siginfo_t siginfo_cap; + static sigjmp_buf rjmpbuf; + + static void signal_handler(int signo, siginfo_t *siginfo, void *v) + { + siginfo_cap = *siginfo; + siglongjmp(rjmpbuf, 1); + } +}; +template siginfo_t sig_capture_templ::siginfo_cap; +template sigjmp_buf sig_capture_templ::rjmpbuf; + +using sig_capture = sig_capture_templ<>; + +inline void prepare_signal(int signo) +{ + struct sigaction the_action; + the_action.sa_sigaction = sig_capture::signal_handler; + the_action.sa_flags = SA_SIGINFO; + sigfillset(&the_action.sa_mask); + + sigaction(signo, &the_action, nullptr); +} + +inline sigjmp_buf &get_sigreceive_jmpbuf() +{ + return sig_capture::rjmpbuf; +} + +inline void unprep_signal(int signo) +{ + signal(signo, SIG_DFL); +} + +inline siginfo_t * get_siginfo() +{ + return &sig_capture::siginfo_cap; +} + +} } // namespace dasynq :: select_mech + +template class select_events : public Base +{ + fd_set read_set; + fd_set write_set; + //fd_set error_set; // logical OR of both the above + int max_fd = 0; // highest fd in any of the sets + + sigset_t active_sigmask; // mask out unwatched signals i.e. active=0 + void * sig_userdata[NSIG]; + + // userdata pointers in read and write respectively, for each fd: + std::vector rd_udata; + std::vector wr_udata; + + // Base contains: + // lock - a lock that can be used to protect internal structure. + // receive*() methods will be called with lock held. + // receive_signal(sigdata_t &, user *) noexcept + // receive_fd_event(fd_r, user *, int flags) noexcept + + using sigdata_t = select_traits::sigdata_t; + using fd_r = typename select_traits::fd_r; + + void process_events(fd_set *read_set_p, fd_set *write_set_p, fd_set *error_set_p) + { + std::lock_guard guard(Base::lock); + + // Note: if error is set, report read-ready. + + for (int i = 0; i <= max_fd; i++) { + if (FD_ISSET(i, read_set_p) || FD_ISSET(i, error_set_p)) { + if (FD_ISSET(i, &read_set) && rd_udata[i] != nullptr) { + // report read + auto r = Base::receive_fd_event(*this, fd_r(i), rd_udata[i], IN_EVENTS); + if (std::get<0>(r) == 0) { + FD_CLR(i, &read_set); + } + } + } + } + + for (int i = 0; i <= max_fd; i++) { + if (FD_ISSET(i, write_set_p)) { + if (FD_ISSET(i, &write_set) && wr_udata[i] != nullptr) { + // report write + auto r = Base::receive_fd_event(*this, fd_r(i), wr_udata[i], OUT_EVENTS); + if (std::get<0>(r) == 0) { + FD_CLR(i, &write_set); + } + } + } + } + } + + public: + + /** + * select_events constructor. + * + * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. + */ + select_events() + { + FD_ZERO(&read_set); + FD_ZERO(&write_set); + sigfillset(&active_sigmask); + Base::init(this); + } + + ~select_events() + { + } + + // fd: file descriptor to watch + // userdata: data to associate with descriptor + // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT + // (only one of IN_EVENTS/OUT_EVENTS can be specified) + // soft_fail: true if unsupported file descriptors should fail by returning false instead + // of throwing an exception + // returns: true on success; false if file descriptor type isn't supported and emulate == true + // throws: std::system_error or std::bad_alloc on failure + bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false) + { + if (fd >= FD_SETSIZE) { + throw std::system_error(EMFILE, std::system_category()); + } + + if (flags & IN_EVENTS) { + FD_SET(fd, &read_set); + if (size_t(fd) >= rd_udata.size()) { + rd_udata.resize(fd + 1); + } + rd_udata[fd] = userdata; + } + else { + FD_SET(fd, &write_set); + if (size_t(fd) >= wr_udata.size()) { + wr_udata.resize(fd + 1); + } + wr_udata[fd] = userdata; + } + + max_fd = std::max(fd, max_fd); + + return true; + } + + // returns: 0 on success + // IN_EVENTS if in watch requires emulation + // OUT_EVENTS if out watch requires emulation + int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false) + { + if (fd >= FD_SETSIZE) { + throw std::system_error(EMFILE, std::system_category()); + } + + if (flags & IN_EVENTS) { + FD_SET(fd, &read_set); + if (size_t(fd) >= rd_udata.size()) { + rd_udata.resize(fd + 1); + } + rd_udata[fd] = userdata; + } + if (flags & OUT_EVENTS) { + FD_SET(fd, &write_set); + if (size_t(fd) >= wr_udata.size()) { + wr_udata.resize(fd + 1); + } + wr_udata[fd] = userdata; + } + + max_fd = std::max(fd, max_fd); + + return 0; + } + + // flags specifies which watch to remove; ignored if the loop doesn't support + // separate read/write watches. + void remove_fd_watch_nolock(int fd, int flags) + { + if (flags & IN_EVENTS) { + FD_CLR(fd, &read_set); + rd_udata[fd] = nullptr; + } + if (flags & OUT_EVENTS) { + FD_CLR(fd, &write_set); + wr_udata[fd] = nullptr; + } + } + + void remove_fd_watch(int fd, int flags) + { + std::lock_guard guard(Base::lock); + remove_fd_watch_nolock(fd, flags); + } + + void remove_bidi_fd_watch(int fd) noexcept + { + FD_CLR(fd, &read_set); + FD_CLR(fd, &write_set); + } + + void enable_fd_watch_nolock(int fd, void *userdata, int flags) + { + if (flags & IN_EVENTS) { + FD_SET(fd, &read_set); + } + else { + FD_SET(fd, &write_set); + } + } + + void enable_fd_watch(int fd, void *userdata, int flags) + { + std::lock_guard guard(Base::lock); + enable_fd_watch_nolock(fd, userdata, flags); + } + + void disable_fd_watch_nolock(int fd, int flags) + { + if (flags & IN_EVENTS) { + FD_CLR(fd, &read_set); + } + else { + FD_CLR(fd, &write_set); + } + } + + void disable_fd_watch(int fd, int flags) + { + std::lock_guard guard(Base::lock); + disable_fd_watch_nolock(fd, flags); + } + + // Note signal should be masked before call. + void add_signal_watch(int signo, void *userdata) + { + std::lock_guard guard(Base::lock); + add_signal_watch_nolock(signo, userdata); + } + + // Note signal should be masked before call. + void add_signal_watch_nolock(int signo, void *userdata) + { + sig_userdata[signo] = userdata; + sigdelset(&active_sigmask, signo); + dprivate::select_mech::prepare_signal(signo); + } + + // Note, called with lock held: + void rearm_signal_watch_nolock(int signo, void *userdata) noexcept + { + sig_userdata[signo] = userdata; + sigdelset(&active_sigmask, signo); + } + + void remove_signal_watch_nolock(int signo) noexcept + { + dprivate::select_mech::unprep_signal(signo); + sigaddset(&active_sigmask, signo); + sig_userdata[signo] = nullptr; + // No need to signal other threads + } + + void remove_signal_watch(int signo) noexcept + { + std::lock_guard guard(Base::lock); + remove_signal_watch_nolock(signo); + } + + public: + + // If events are pending, process an unspecified number of them. + // If no events are pending, wait until one event is received and + // process this event (and possibly any other events received + // simultaneously). + // If processing an event removes a watch, there is a possibility + // that the watched event will still be reported (if it has + // occurred) before pull_events() returns. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pull_events(bool do_wait) noexcept + { + using namespace dprivate::select_mech; + + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 0; + + fd_set read_set_c; + fd_set write_set_c; + fd_set err_set; + + Base::lock.lock(); + read_set_c = read_set; + write_set_c = write_set; + err_set = read_set; + + sigset_t sigmask; + this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask); + // This is horrible, but hopefully will be optimised well. POSIX gives no way to combine signal + // sets other than this. + for (int i = 1; i < NSIG; i++) { + if (! sigismember(&active_sigmask, i)) { + sigdelset(&sigmask, i); + } + } + int nfds = max_fd + 1; + Base::lock.unlock(); + + volatile bool was_signalled = false; + + // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is + // received during polling, it will longjmp back to here: + if (sigsetjmp(get_sigreceive_jmpbuf(), 1) != 0) { + std::atomic_signal_fence(std::memory_order_acquire); + auto * sinfo = get_siginfo(); + sigdata_t sigdata; + sigdata.info = *sinfo; + Base::lock.lock(); + void *udata = sig_userdata[sinfo->si_signo]; + if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) { + sigaddset(&sigmask, sinfo->si_signo); + sigaddset(&active_sigmask, sinfo->si_signo); + } + Base::lock.unlock(); + was_signalled = true; + } + + if (was_signalled) { + do_wait = false; + } + + std::atomic_signal_fence(std::memory_order_release); + + int r = pselect(nfds, &read_set_c, &write_set_c, &err_set, do_wait ? nullptr : &ts, &sigmask); + if (r == -1 || r == 0) { + // signal or no events + return; + } + + process_events(&read_set_c, &write_set_c, &err_set); + } +}; + +} // end namespace diff --git a/src/dasynq/dasynq-svec.h b/src/dasynq/dasynq-svec.h index d32190e..b8cc7f7 100644 --- a/src/dasynq/dasynq-svec.h +++ b/src/dasynq/dasynq-svec.h @@ -5,7 +5,11 @@ #include #include -// Vector with possibility to shrink capacity arbitrarily +// Vector with possibility to shrink capacity arbitrarily. +// +// The standard vector (std::vector) only allows shrinking a vector's capacity to its current size. In cases +// where we need to keep some reserved capacity beyond the current size, we need an alternative solution: hence, +// this class, svector. namespace dasynq { @@ -13,7 +17,14 @@ template class svector { private: - T * array; + union vec_node { + T elem; + + vec_node() { } + ~vec_node() { } + }; + + vec_node * array; size_t size_v; size_t capacity_v; @@ -22,10 +33,10 @@ class svector if (size_v == capacity_v) { // double capacity now: if (capacity_v == 0) capacity_v = 1; - T * new_array = new T[capacity_v * 2]; + vec_node * new_array = new vec_node[capacity_v * 2]; for (size_t i = 0; i < size_v; i++) { - new (&new_array[i]) T(std::move(array[i])); - array[i].T::~T(); + new (&new_array[i].elem) T(std::move(array[i].elem)); + array[i].elem.T::~T(); } delete[] array; array = new_array; @@ -47,14 +58,14 @@ class svector size_v = other.size_v; array = new T[capacity_v]; for (size_t i = 0; i < size_v; i++) { - new (&array[i]) T(other[i]); + new (&array[i].elem) T(other[i].elem); } } ~svector() { for (size_t i = 0; i < size_v; i++) { - array[i].T::~T(); + array[i].elem.T::~T(); } delete[] array; } @@ -62,14 +73,14 @@ class svector void push_back(const T &t) { check_capacity(); - new (&array[size_v]) T(t); + new (&array[size_v].elem) T(t); size_v++; } void push_back(T &&t) { check_capacity(); - new (&array[size_v]) T(t); + new (&array[size_v].elem) T(t); size_v++; } @@ -77,7 +88,7 @@ class svector void emplace_back(U... args) { check_capacity(); - new (&array[size_v]) T(args...); + new (&array[size_v].elem) T(args...); size_v++; } @@ -88,12 +99,12 @@ class svector T &operator[](size_t index) { - return array[index]; + return array[index].elem; } const T &operator[](size_t index) const { - return array[index]; + return array[index].elem; } size_t size() const @@ -124,10 +135,10 @@ class svector void reserve(size_t amount) { if (capacity_v < amount) { - T * new_array = new T[amount]; + vec_node * new_array = new vec_node[amount]; for (size_t i = 0; i < size_v; i++) { - new (&new_array[i]) T(std::move(array[i])); - array[i].T::~T(); + new (&new_array[i].elem) T(std::move(array[i].elem)); + array[i].elem.T::~T(); } delete[] array; array = new_array; @@ -138,13 +149,13 @@ class svector void shrink_to(size_t amount) { if (capacity_v > amount) { - T * new_array = new(std::nothrow) T[amount]; + vec_node * new_array = new(std::nothrow) vec_node[amount]; if (new_array == nullptr) { return; } for (size_t i = 0; i < size_v; i++) { - new (&new_array[i]) T(std::move(array[i])); - array[i].T::~T(); + new (&new_array[i].elem) T(std::move(array[i].elem)); + array[i].elem.T::~T(); } delete[] array; array = new_array; @@ -154,27 +165,27 @@ class svector T &back() { - return array[size_v - 1]; + return array[size_v - 1].elem; } T* begin() { - return array; + return reinterpret_cast(array); } const T *begin() const { - return array; + return reinterpret_cast(array); } T* end() { - return array + size_v; + return reinterpret_cast(array + size_v); } const T *end() const { - return array + size_v; + return reinterpret_cast(array + size_v); } }; diff --git a/src/dasynq/dasynq-timerfd.h b/src/dasynq/dasynq-timerfd.h index c3b465d..51bcdbe 100644 --- a/src/dasynq/dasynq-timerfd.h +++ b/src/dasynq/dasynq-timerfd.h @@ -96,16 +96,22 @@ template class timer_fd_events : public timer_base }; template - void receive_fd_event(T &loop_mech, typename traits_t::fd_r fd_r_a, void * userdata, int flags) + std::tuple + receive_fd_event(T &loop_mech, typename traits_t::fd_r fd_r_a, void * userdata, int flags) { if (userdata == &timerfd_fd) { process_timer(clock_type::MONOTONIC, timerfd_fd); + return std::make_tuple(IN_EVENTS, typename traits_t::fd_s(timerfd_fd)); } else if (userdata == &systemtime_fd) { process_timer(clock_type::SYSTEM, systemtime_fd); + if (Base::traits_t::supports_non_oneshot_fd) { + return std::make_tuple(0, typename traits_t::fd_s(systemtime_fd)); + } + return std::make_tuple(IN_EVENTS, typename traits_t::fd_s(systemtime_fd)); } else { - Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags); + return Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags); } } diff --git a/src/dasynq/dasynq.h b/src/dasynq/dasynq.h index 08439b8..68175c4 100644 --- a/src/dasynq/dasynq.h +++ b/src/dasynq/dasynq.h @@ -64,7 +64,23 @@ namespace dasynq { using loop_traits_t = epoll_traits; } #else -#error No loop backened defined - see dasynq-config.h +#include "dasynq-select.h" +#if _POSIX_TIMERS > 0 +#include "dasynq-posixtimer.h" +namespace dasynq { + template using timer_events = posix_timer_events; +} +#else +#include "dasynq-itimer.h" +namespace dasynq { + template using timer_events = itimer_events; +} +#endif +#include "dasynq-childproc.h" +namespace dasynq { + template using loop_t = select_events>>>; + using loop_traits_t = select_traits; +} #endif #include @@ -320,12 +336,17 @@ namespace dprivate { return true; } + // Receive fd event delivered from backend mechansim. Returns the desired watch mask, as per + // set_fd_enabled, which can be used to leave the watch disabled, re-enable it or re-enable + // one direction of a bi-directional watcher. template - void receive_fd_event(T &loop_mech, typename Traits::fd_r fd_r, void * userdata, int flags) noexcept + std::tuple receive_fd_event(T &loop_mech, typename Traits::fd_r fd_r, + void * userdata, int flags) noexcept { base_fd_watcher * bfdw = static_cast(userdata); bfdw->event_flags |= flags; + typename Traits::fd_s watch_fd_s {bfdw->watch_fd}; base_watcher * bwatcher = bfdw; @@ -345,17 +366,20 @@ namespace dprivate { queue_watcher(bwatcher); - if (! traits_t::has_separate_rw_fd_watches) { + if (is_multi_watch && ! traits_t::has_separate_rw_fd_watches) { // If this is a bidirectional fd-watch, it has been disabled in *both* directions // as the event was delivered. However, the other direction should not be disabled // yet, so we need to re-enable: int in_out_mask = IN_EVENTS | OUT_EVENTS; - if (is_multi_watch && (bfdw->watch_flags & in_out_mask) != 0) { + if ((bfdw->watch_flags & in_out_mask) != 0) { // We need to re-enable the other channel now: - loop_mech.enable_fd_watch_nolock(bfdw->watch_fd, userdata, - (bfdw->watch_flags & in_out_mask) | ONE_SHOT); + return std::make_tuple((bfdw->watch_flags & in_out_mask) | ONE_SHOT, watch_fd_s); + // We are the polling thread: don't need to interrupt polling, even if it would + // normally be required. } } + + return std::make_tuple(0, watch_fd_s); } // Child process terminated. Called with both the main lock and the reaper lock held. @@ -388,7 +412,7 @@ namespace dprivate { return r; } - // Queue a watcher for reomval, or issue "removed" callback to it. + // Queue a watcher for removal, or issue "removed" callback to it. // Call with lock free. void issue_delete(base_watcher *watcher) noexcept { @@ -545,8 +569,7 @@ class event_loop // - The mutex only protects manipulation of the wait queues, and so should not // be highly contended. - mutex_t wait_lock; // wait lock, used to prevent multiple threads from waiting - // on the event queue simultaneously. + mutex_t wait_lock; // protects the wait/attention queues waitqueue attn_waitqueue; waitqueue wait_waitqueue; @@ -568,6 +591,9 @@ class event_loop loop_mech.prepare_watcher(callBack); try { loop_mech.add_signal_watch_nolock(signo, callBack); + if (backend_traits_t::interrupt_after_signal_add) { + interrupt_if_necessary(); + } } catch (...) { loop_mech.release_watcher(callBack); @@ -606,6 +632,9 @@ class event_loop } } } + else if (enabled && backend_traits_t::interrupt_after_fd_add) { + interrupt_if_necessary(); + } } catch (...) { loop_mech.release_watcher(callback); @@ -622,6 +651,7 @@ class event_loop try { loop_mech.prepare_watcher(&callback->out_watcher); try { + bool do_interrupt = false; if (backend_traits_t::has_separate_rw_fd_watches) { int r = loop_mech.add_bidi_fd_watch(fd, callback, eventmask | ONE_SHOT, emulate); if (r & IN_EVENTS) { @@ -630,12 +660,19 @@ class event_loop requeue_watcher(callback); } } + else if ((eventmask & IN_EVENTS) && backend_traits_t::interrupt_after_fd_add) { + do_interrupt = true; + } + if (r & OUT_EVENTS) { callback->out_watcher.emulatefd = true; if (eventmask & OUT_EVENTS) { requeue_watcher(&callback->out_watcher); } } + else if ((eventmask & OUT_EVENTS) && backend_traits_t::interrupt_after_fd_add) { + do_interrupt = true; + } } else { if (! loop_mech.add_fd_watch(fd, callback, eventmask | ONE_SHOT, true, emulate)) { @@ -648,6 +685,13 @@ class event_loop requeue_watcher(&callback->out_watcher); } } + else if (backend_traits_t::interrupt_after_fd_add) { + do_interrupt = true; + } + } + + if (do_interrupt) { + interrupt_if_necessary(); } } catch (...) { @@ -665,6 +709,9 @@ class event_loop { if (enabled) { loop_mech.enable_fd_watch(fd, watcher, watch_flags | ONE_SHOT); + if (backend_traits_t::interrupt_after_fd_add) { + interrupt_if_necessary(); + } } else { loop_mech.disable_fd_watch(fd, watch_flags); @@ -675,6 +722,9 @@ class event_loop { if (enabled) { loop_mech.enable_fd_watch_nolock(fd, watcher, watch_flags | ONE_SHOT); + if (backend_traits_t::interrupt_after_fd_add) { + interrupt_if_necessary(); + } } else { loop_mech.disable_fd_watch_nolock(fd, watch_flags); @@ -874,6 +924,16 @@ class event_loop } } + // Interrupt the current poll-waiter, if necessary - that is, if the loop is multi-thread safe, and if + // there is currently another thread polling the backend event mechanism. + void interrupt_if_necessary() + { + std::lock_guard guard(wait_lock); + if (! attn_waitqueue.is_empty()) { // (always false for single-threaded loops) + loop_mech.interrupt_wait(); + } + } + // Acquire the attention lock (when held, ensures that no thread is polling the AEN // mechanism). This can be used to safely remove watches, since it is certain that // notification callbacks won't be run while the attention lock is held. @@ -932,6 +992,9 @@ class event_loop // Called with lock held if (rearm_type == rearm::REARM) { loop_mech.rearm_signal_watch_nolock(bsw->siginfo.get_signo(), bsw); + if (backend_traits_t::interrupt_after_signal_add) { + interrupt_if_necessary(); + } } else if (rearm_type == rearm::REMOVE) { loop_mech.remove_signal_watch_nolock(bsw->siginfo.get_signo()); @@ -963,7 +1026,8 @@ class event_loop if (bdfw->watch_flags & IN_EVENTS) { bdfw->watch_flags &= ~IN_EVENTS; if (! emulatedfd) { - loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags); + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags, + bdfw->watch_flags != 0); } } return rearm::NOOP; @@ -982,12 +1046,8 @@ class event_loop if (! emulatedfd) { if (! backend_traits_t::has_separate_rw_fd_watches) { - int watch_flags = bdfw->watch_flags; - // without separate r/w watches, enable_fd_watch actually sets - // which sides are enabled (i.e. can be used to disable): - loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, - static_cast(bdfw), - (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + int watch_flags = bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS); + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags, watch_flags != 0); } else { loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS); @@ -1000,14 +1060,11 @@ class event_loop if (! emulatedfd) { if (! backend_traits_t::has_separate_rw_fd_watches) { int watch_flags = bdfw->watch_flags; - loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, - static_cast(bdfw), - (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, + watch_flags & (IN_EVENTS | OUT_EVENTS), true); } else { - loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, - static_cast(bdfw), - IN_EVENTS | ONE_SHOT); + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, IN_EVENTS, true); } } else { @@ -1039,8 +1096,8 @@ class event_loop } } else if (rearm_type == rearm::REARM) { - loop_mech.enable_fd_watch_nolock(bfw->watch_fd, bfw, - (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + set_fd_enabled_nolock(bfw, bfw->watch_fd, + bfw->watch_flags & (IN_EVENTS | OUT_EVENTS), true); } else if (rearm_type == rearm::DISARM) { loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags); @@ -1090,7 +1147,7 @@ class event_loop if (! bdfw->read_removed) { if (bdfw->watch_flags & OUT_EVENTS) { bdfw->watch_flags &= ~OUT_EVENTS; - loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags); + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags, true); } return rearm::NOOP; } @@ -1106,9 +1163,7 @@ class event_loop if (! backend_traits_t::has_separate_rw_fd_watches) { int watch_flags = bdfw->watch_flags; - loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, - static_cast(bdfw), - (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags & (IN_EVENTS | OUT_EVENTS), true); } else { loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, OUT_EVENTS); @@ -1119,14 +1174,10 @@ class event_loop if (! backend_traits_t::has_separate_rw_fd_watches) { int watch_flags = bdfw->watch_flags; - loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, - static_cast(bdfw), - (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags & (IN_EVENTS | OUT_EVENTS), true); } else { - loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, - static_cast(bdfw), - OUT_EVENTS | ONE_SHOT); + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, OUT_EVENTS | ONE_SHOT, true); } } return rearm_type; -- 2.25.1