// You can customise Dasynq's build options in this file. Typically, you won't need to do anything; the
// defaults are sensible for a range of operating systems, though for some BSD family OSes you may need
-// to explicitly define DASYNQ_HAVE_KQUEUE to 1. Either kqueue or epoll must be available (i.e. either
-// DASYNQ_HAVE_KQUEUE or DASYNQ_HAVE_EPOLL need to be defined to 1). There are two parts to the file: the
-// first is the custom configuration section, where you may specify custom settings, and the second
-// section contains automatic configuration to fill in remaining settings based on known features in
-// certain operating systems and compilers.
+// to explicitly define DASYNQ_HAVE_KQUEUE to 1. If neither epoll nor kqueue are available, the select-
+// based backend is used, and DASYNQ_HAVE_PSELECT must be defined (to either 1 or 0, if pselect is or is
+// not available, respectively).
+
+// There are two parts to the file: the first is the custom configuration section, where you may specify
+// custom settings, and the second section contains automatic configuration to fill in remaining settings
+// based on known features in certain operating systems and compilers.
// ---------------------------------------------------------------------------------------------------------
// Part 1: Custom configuration, please edit to suit your system / requirements.
// #define DASYNQ_HAVE_KQUEUE 1
//
// If the epoll family of system calls are available:
-// #define DASYNQ_HAVE_KQUEUE 1
+// #define DASYNQ_HAVE_EPOLL 1
//
// If the pipe2 system call is available:
// #define HAVE_PIPE2 1
//
+// If the pselect system call is available:
+// #define HAVE_PSELECT 1
+//
// A tag to include at the end of a class body for a class which is allowed to have zero size.
// Normally, C++ mandates that all objects (except empty base subobjects) have non-zero size, but on some
// compilers (at least GCC and LLVM-Clang) there are tricks to get around this awkward limitation. Note that
#if ! defined(DASYNQ_HAVE_KQUEUE)
#if defined(__OpenBSD__) || defined(__APPLE__) || defined(__FreeBSD__)
#define DASYNQ_HAVE_KQUEUE 1
-#if defined(__APPLE__)
-// kqueue on macos has "issues". See extra/macos-kqueue-bug. There is an alternate Dasyqn kqueue backend
-// which avoids the issue, which is enabled via DASYNQ_KQUEUE_MACOS_WORKAROUND.
-#define DASYNQ_KQUEUE_MACOS_WORKAROUND 1
#endif
#endif
+
+#if DASYNQ_HAVE_KQUEUE && !defined(DASYNQ_KQUEUE_MACOS_WORKAROUND) && defined(__APPLE__)
+// kqueue on macos has "issues". See extra/macos-kqueue-bug. There is an alternate Dasynq kqueue backend
+// which avoids the issue, which is enabled via DASYNQ_KQUEUE_MACOS_WORKAROUND.
+#define DASYNQ_KQUEUE_MACOS_WORKAROUND 1
#endif
-#if defined(__linux__)
#if ! defined(DASYNQ_HAVE_EPOLL)
+#if defined(__linux__)
#define DASYNQ_HAVE_EPOLL 1
#endif
#endif
+#if ! defined(DASYNQ_HAVE_PSELECT)
+#if defined(__sortix__)
+// Sortix doesn't have pselect yet (but has select):
+#define DASYNQ_HAVE_PSELECT 0
+#else
+// POSIX actually requires pselect, so we otherwise assume it's available:
+#define DASYNQ_HAVE_PSELECT 1
+#endif
+#endif
+
// General feature availability
#if (defined(__OpenBSD__) || defined(__linux__)) && ! defined(HAVE_PIPE2)
// Timer implementation based on the (basically obsolete) POSIX itimer interface.
-// With this timer implementation, we only use one clock, and allow no distinction between the
-// monotonic and system time.
+// With this timer implementation, we only have one real clock (the monotonic clock) that we can
+// run a timer against. However, if the system has both clocks, we still maintain two separate queues.
+// If provide_mono_timer is false we actually provide no system timer and rely on the event mechanism
+// which extends this class to measure time and run timeouts (via process_monotonic_timers functions).
-template <class Base> class itimer_events : public timer_base<Base>
+template <class Base, bool provide_mono_timer = true>
+class itimer_events : public timer_base<Base>
{
private:
bool receive_signal(T & loop_mech, sigdata_t &siginfo, void *userdata)
{
if (siginfo.get_signo() == SIGALRM) {
- auto &timer_queue = this->queue_for_clock(clock_type::SYSTEM);
- if (! timer_queue.empty()) {
- struct timespec curtime;
- timer_base<Base>::get_time(curtime, clock_type::SYSTEM, true);
- timer_base<Base>::process_timer_queue(timer_queue, curtime);
+ process_timers();
+ return false; // don't disable signal watch
+ }
+ else {
+ return Base::receive_signal(loop_mech, siginfo, userdata);
+ }
+ }
+
+#ifdef CLOCK_MONOTONIC
+ // We need to override the monotonic timer processing functions from timer_base to check both
+ // timer queues, since we aren't actually providing a separate timer for the system clock:
+
+ inline void process_monotonic_timers(bool &do_wait, timeval &tv, timeval *&wait_tv)
+ {
+ // We need to process both timer queues and set tv/wait_tv according to which has the
+ // timer that will expire soonest:
+ timer_base<Base>::process_timers(clock_type::MONOTONIC, do_wait, tv, wait_tv);
+
+ if (! do_wait) {
+ timer_base<Base>::process_timers(clock_type::SYSTEM, do_wait, tv, wait_tv);
+ }
+ else {
+ timeval mono_tv = tv;
+ timer_base<Base>::process_timers(clock_type::SYSTEM, do_wait, tv, wait_tv);
+ if (mono_tv.tv_sec < tv.tv_sec ||
+ (mono_tv.tv_sec == tv.tv_sec && mono_tv.tv_usec < tv.tv_usec)) {
+ tv = mono_tv;
}
-
+ }
+ }
+
+ inline void process_monotonic_timers(bool &do_wait, timespec &ts, timespec *&wait_ts)
+ {
+ // We need to process both timer queues and set tv/wait_tv according to which has the
+ // timer that will expire soonest:
+ timer_base<Base>::process_timers(clock_type::MONOTONIC, do_wait, ts, wait_ts);
+
+ if (! do_wait) {
+ timer_base<Base>::process_timers(clock_type::SYSTEM, do_wait, ts, wait_ts);
+ }
+ else {
+ timespec mono_ts = ts;
+ timer_base<Base>::process_timers(clock_type::SYSTEM, do_wait, ts, wait_ts);
+ ts = std::min(time_val(ts), time_val(mono_ts));
+ }
+ }
+
+ // Process monotonic timers based on the current clock time. However, we shadow the
+ // function and provide an implementation which also processes the system clock timer queue.
+ inline void process_monotonic_timers()
+ {
+ auto &mono_timer_queue = this->queue_for_clock(clock_type::MONOTONIC);
+ if (! mono_timer_queue.empty()) {
+ struct timespec curtime_mono;
+ timer_base<Base>::get_time(curtime_mono, clock_type::MONOTONIC, true);
+ timer_base<Base>::process_timer_queue(mono_timer_queue, curtime_mono);
+ }
+
+ auto &sys_timer_queue = this->queue_for_clock(clock_type::SYSTEM);
+ if (! sys_timer_queue.empty()) {
+ struct timespec curtime_sys;
+ timer_base<Base>::get_time(curtime_sys, clock_type::SYSTEM, true);
+ timer_base<Base>::process_timer_queue(sys_timer_queue, curtime_sys);
+ }
+ }
+
+#endif
+
+ void process_timers()
+ {
+ auto &timer_queue = this->queue_for_clock(clock_type::SYSTEM);
+ if (! timer_queue.empty()) {
+ struct timespec curtime;
+ timer_base<Base>::get_time(curtime, clock_type::SYSTEM, true);
+ timer_base<Base>::process_timer_queue(timer_queue, curtime);
+ }
+
#ifdef CLOCK_MONOTONIC
+ if (provide_mono_timer) {
auto &mono_timer_queue = this->queue_for_clock(clock_type::MONOTONIC);
if (! mono_timer_queue.empty()) {
struct timespec curtime_mono;
timer_base<Base>::get_time(curtime_mono, clock_type::MONOTONIC, true);
timer_base<Base>::process_timer_queue(mono_timer_queue, curtime_mono);
}
+ }
#endif
+ if (provide_mono_timer) {
// arm alarm with timeout from head of queue
set_timer_from_queue();
- return false; // don't disable signal watch
- }
- else {
- return Base::receive_signal(loop_mech, siginfo, userdata);
}
}
-
+
public:
class traits_t : public Base::traits_t
template <typename T> void init(T *loop_mech)
{
- sigset_t sigmask;
- this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
- sigaddset(&sigmask, SIGALRM);
- this->sigmaskf(SIG_SETMASK, &sigmask, nullptr);
- loop_mech->add_signal_watch(SIGALRM, nullptr);
+ if (provide_mono_timer) {
+ sigset_t sigmask;
+ this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
+ sigaddset(&sigmask, SIGALRM);
+ this->sigmaskf(SIG_SETMASK, &sigmask, nullptr);
+ loop_mech->add_signal_watch(SIGALRM, this);
+ }
Base::init(loop_mech);
}
ts.expiry_count = 0;
ts.enabled = enable;
+ bool do_set_timer;
if (timer_queue.is_queued(timer_id)) {
// Already queued; alter timeout
- if (timer_queue.set_priority(timer_id, timeout)) {
- set_timer_from_queue();
- }
+ do_set_timer = timer_queue.set_priority(timer_id, timeout);
}
else {
- if (timer_queue.insert(timer_id, timeout)) {
+ do_set_timer = timer_queue.insert(timer_id, timeout);
+ }
+
+ if (do_set_timer) {
+ if (provide_mono_timer) {
set_timer_from_queue();
}
+ else {
+ this->interrupt_wait();
+ }
}
}
constexpr static bool supports_non_oneshot_fd = false;
};
-#if _POSIX_REALTIME_SIGNALS > 0
-static inline void prepare_signal(int signo) { }
-static inline void unprep_signal(int signo) { }
-
-inline bool get_siginfo(int signo, siginfo_t *siginfo)
-{
- struct timespec timeout;
- timeout.tv_sec = 0;
- timeout.tv_nsec = 0;
-
- sigset_t mask;
- sigemptyset(&mask);
- sigaddset(&mask, signo);
- return (sigtimedwait(&mask, siginfo, &timeout) != -1);
-}
-#else
-
-// If we have no sigtimedwait implementation, we have to retrieve signal data by establishing a
-// signal handler.
-
-// We need to declare and define a non-static data variable, "siginfo_p", in this header, without
-// violating the "one definition rule". The only way to do that is via a template, even though we
-// don't otherwise need a template here:
-template <typename T = decltype(nullptr)> class sig_capture_templ
-{
- public:
- static siginfo_t * siginfo_p;
-
- static void signalHandler(int signo, siginfo_t *siginfo, void *v)
- {
- *siginfo_p = *siginfo;
- }
-};
-template <typename T> siginfo_t * sig_capture_templ<T>::siginfo_p = nullptr;
-
-using sig_capture = sig_capture_templ<>;
-
-inline void prepare_signal(int signo)
-{
- struct sigaction the_action;
- the_action.sa_sigaction = sig_capture::signalHandler;
- the_action.sa_flags = SA_SIGINFO;
- sigfillset(&the_action.sa_mask);
-
- sigaction(signo, &the_action, nullptr);
-}
-
-inline void unprep_signal(int signo)
-{
- signal(signo, SIG_DFL);
-}
-
-inline bool get_siginfo(int signo, siginfo_t *siginfo)
-{
- sig_capture::siginfo_p = siginfo;
-
- sigset_t mask;
- sigfillset(&mask);
- sigdelset(&mask, signo);
- sigsuspend(&mask);
- return true;
-}
-
-#endif
-
-template <class Base> class macos_kqueue_loop : public signal_events<Base,true>
+template <class Base> class macos_kqueue_loop : public signal_events<Base, true>
{
int kqfd; // kqueue fd
}
}
- // Now we disable all received events, to simulate EV_DISPATCH:
+ // Now we disable all received events, to simulate EV_DISPATCH. Note that EV_DISPATH is
+ // actually available on MacOS, but we can't use it due to the signal processing bug.
kevent(kqfd, events, r, nullptr, 0, nullptr);
}
struct kevent events[16];
struct timespec ts;
+ // wait_ts remains null for an infinite wait; it is later set to either a 0 timeout
+ // if do_wait is false (or if we otherwise won't wait due to events being detected
+ // early) or is set to an appropriate timeout for the next timer's timeout.
+ struct timespec *wait_ts = nullptr;
+
Base::lock.lock();
- sigset_t sigmask = this->get_active_sigmask();
- Base::lock.unlock();
- volatile bool was_signalled = false;
+ // Check whether any timers are pending, and what the next timeout is.
+ this->process_monotonic_timers(do_wait, ts, wait_ts);
+
+ const sigset_t &active_sigmask = this->get_active_sigmask();
+ Base::lock.unlock();
// using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
// received during polling, it will longjmp back to here:
if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
- this->process_signal(sigmask);
- was_signalled = true;
- }
-
- if (was_signalled) {
+ this->process_signal();
do_wait = false;
}
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
+ if (! do_wait) {
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+ wait_ts = &ts;
+ }
std::atomic_signal_fence(std::memory_order_release);
- this->sigmaskf(SIG_UNBLOCK, &sigmask, nullptr);
- int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
- this->sigmaskf(SIG_BLOCK, &sigmask, nullptr);
+
+ // Run kevent with signals unmasked:
+ this->sigmaskf(SIG_UNBLOCK, &active_sigmask, nullptr);
+ int r = kevent(kqfd, nullptr, 0, events, 16, wait_ts);
+ this->sigmaskf(SIG_BLOCK, &active_sigmask, nullptr);
+
if (r == -1 || r == 0) {
// signal or no events
+ if (r == 0 && do_wait) {
+ // timeout:
+ Base::lock.lock();
+ this->process_monotonic_timers();
+ Base::lock.unlock();
+ }
return;
}
+ ts = time_val(0, 0);
+
do {
process_events(events, r);
r = kevent(kqfd, nullptr, 0, events, 16, &ts);
struct kevent events[16];
struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
- int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
+ // wait_ts remains null for an infinite wait; it is later set to either a 0 timeout
+ // if do_wait is false (or if we otherwise won't wait due to events being detected
+ // early) or is set to an appropriate timeout for the next timer's timeout.
+ struct timespec *wait_ts = nullptr;
+
+ // Check whether any timers are pending, and what the next timeout is.
+ Base::lock.lock();
+ this->process_monotonic_timers(do_wait, ts, wait_ts);
+ Base::lock.unlock();
+
+ if (! do_wait) {
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+ wait_ts = &ts;
+ }
+
+ int r = kevent(kqfd, nullptr, 0, events, 16, wait_ts);
if (r == -1 || r == 0) {
// signal or no events
+ if (r == 0 && do_wait) {
+ // timeout:
+ Base::lock.lock();
+ this->process_monotonic_timers();
+ Base::lock.unlock();
+ }
return;
}
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+
do {
process_events(events, r);
r = kevent(kqfd, nullptr, 0, events, 16, &ts);
// Timer implementation based on POSIX create_timer et al.
// May require linking with -lrt
-template <class Base> class posix_timer_events : public timer_base<Base>
+template <class Base, bool provide_mono_timer = true>
+class posix_timer_events : public timer_base<Base>
{
private:
timer_t real_timer;
set_timer_from_queue(real_timer, real_timer_queue);
}
- if (! mono_timer_queue.empty()) {
+ if (! mono_timer_queue.empty() && provide_mono_timer) {
this->get_time(curtime, clock_type::MONOTONIC, true);
this->process_timer_queue(mono_timer_queue, curtime);
set_timer_from_queue(mono_timer, mono_timer_queue);
this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
sigaddset(&sigmask, SIGALRM);
this->sigmaskf(SIG_SETMASK, &sigmask, nullptr);
- loop_mech->add_signal_watch(SIGALRM, nullptr);
+ loop_mech->add_signal_watch(SIGALRM, this);
struct sigevent timer_sigevent;
timer_sigevent.sigev_notify = SIGEV_SIGNAL;
// Create the timers; throw std::system_error if we can't.
if (timer_create(CLOCK_REALTIME, &timer_sigevent, &real_timer) == 0) {
- if (timer_create(CLOCK_MONOTONIC, &timer_sigevent, &mono_timer) != 0) {
+ if (provide_mono_timer && timer_create(CLOCK_MONOTONIC, &timer_sigevent, &mono_timer) != 0) {
timer_delete(real_timer);
throw std::system_error(errno, std::system_category());
}
if (timer_queue.is_queued(timer_id)) {
// Already queued; alter timeout
if (timer_queue.set_priority(timer_id, timeout)) {
- set_timer_from_queue(timer, timer_queue);
+ if (clock != clock_type::MONOTONIC || provide_mono_timer) {
+ set_timer_from_queue(timer, timer_queue);
+ }
}
}
else {
if (timer_queue.insert(timer_id, timeout)) {
- set_timer_from_queue(timer, timer_queue);
+ if (clock != clock_type::MONOTONIC || provide_mono_timer) {
+ set_timer_from_queue(timer, timer_queue);
+ }
}
}
}
void set_timer_rel(timer_handle_t &timer_id, const timespec &timeout, const timespec &interval,
bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
{
- // TODO consider caching current time somehow; need to decide then when to update cached value.
struct timespec curtime;
- int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME;
- clock_gettime(posix_clock_id, &curtime);
+ this->get_time(curtime, clock, false);
curtime.tv_sec += timeout.tv_sec;
curtime.tv_nsec += timeout.tv_nsec;
if (curtime.tv_nsec > 1000000000) {
if (timer_queue.is_queued(timer_id)) {
bool was_first = (&timer_queue.get_root()) == &timer_id;
timer_queue.remove(timer_id);
- if (was_first) {
+ if (was_first && (clock != clock_type::MONOTONIC || provide_mono_timer)) {
set_timer_from_queue(timer, timer_queue);
}
}
~posix_timer_events()
{
- timer_delete(mono_timer);
+ if (provide_mono_timer) {
+ timer_delete(mono_timer);
+ }
timer_delete(real_timer);
}
};
--- /dev/null
+#include "dasynq-select.h"
+#include "dasynq-signal.h"
+
+namespace dasynq {
+
+template <class Base> class pselect_events : public signal_events<Base, false>
+{
+ fd_set read_set;
+ fd_set write_set;
+ //fd_set error_set; // logical OR of both the above
+ int max_fd = 0; // highest fd in any of the sets
+
+ // userdata pointers in read and write respectively, for each fd:
+ std::vector<void *> rd_udata;
+ std::vector<void *> wr_udata;
+
+ // Base contains:
+ // lock - a lock that can be used to protect internal structure.
+ // receive*() methods will be called with lock held.
+ // receive_signal(sigdata_t &, user *) noexcept
+ // receive_fd_event(fd_r, user *, int flags) noexcept
+
+ using fd_r = typename select_traits::fd_r;
+
+ void process_events(fd_set *read_set_p, fd_set *write_set_p, fd_set *error_set_p)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+ // Note: if error is set, report read-ready.
+
+ for (int i = 0; i <= max_fd; i++) {
+ if (FD_ISSET(i, read_set_p) || FD_ISSET(i, error_set_p)) {
+ if (FD_ISSET(i, &read_set) && rd_udata[i] != nullptr) {
+ // report read
+ auto r = Base::receive_fd_event(*this, fd_r(i), rd_udata[i], IN_EVENTS);
+ if (std::get<0>(r) == 0) {
+ FD_CLR(i, &read_set);
+ }
+ }
+ }
+ }
+
+ for (int i = 0; i <= max_fd; i++) {
+ if (FD_ISSET(i, write_set_p)) {
+ if (FD_ISSET(i, &write_set) && wr_udata[i] != nullptr) {
+ // report write
+ auto r = Base::receive_fd_event(*this, fd_r(i), wr_udata[i], OUT_EVENTS);
+ if (std::get<0>(r) == 0) {
+ FD_CLR(i, &write_set);
+ }
+ }
+ }
+ }
+ }
+
+ public:
+
+ /**
+ * pselect_events constructor.
+ *
+ * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
+ */
+ pselect_events()
+ {
+ FD_ZERO(&read_set);
+ FD_ZERO(&write_set);
+ Base::init(this);
+ }
+
+ ~pselect_events()
+ {
+ }
+
+ // fd: file descriptor to watch
+ // userdata: data to associate with descriptor
+ // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT
+ // (only one of IN_EVENTS/OUT_EVENTS can be specified)
+ // soft_fail: true if unsupported file descriptors should fail by returning false instead
+ // of throwing an exception
+ // returns: true on success; false if file descriptor type isn't supported and emulate == true
+ // throws: std::system_error or std::bad_alloc on failure
+ bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false)
+ {
+ if (fd >= FD_SETSIZE) {
+ throw std::system_error(EMFILE, std::system_category());
+ }
+
+ if (flags & IN_EVENTS) {
+ FD_SET(fd, &read_set);
+ if (size_t(fd) >= rd_udata.size()) {
+ rd_udata.resize(fd + 1);
+ }
+ rd_udata[fd] = userdata;
+ }
+ else {
+ FD_SET(fd, &write_set);
+ if (size_t(fd) >= wr_udata.size()) {
+ wr_udata.resize(fd + 1);
+ }
+ wr_udata[fd] = userdata;
+ }
+
+ max_fd = std::max(fd, max_fd);
+
+ return true;
+ }
+
+ // returns: 0 on success
+ // IN_EVENTS if in watch requires emulation
+ // OUT_EVENTS if out watch requires emulation
+ int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false)
+ {
+ if (fd >= FD_SETSIZE) {
+ throw std::system_error(EMFILE, std::system_category());
+ }
+
+ if (flags & IN_EVENTS) {
+ FD_SET(fd, &read_set);
+ if (size_t(fd) >= rd_udata.size()) {
+ rd_udata.resize(fd + 1);
+ }
+ rd_udata[fd] = userdata;
+ }
+ if (flags & OUT_EVENTS) {
+ FD_SET(fd, &write_set);
+ if (size_t(fd) >= wr_udata.size()) {
+ wr_udata.resize(fd + 1);
+ }
+ wr_udata[fd] = userdata;
+ }
+
+ max_fd = std::max(fd, max_fd);
+
+ return 0;
+ }
+
+ // flags specifies which watch to remove; ignored if the loop doesn't support
+ // separate read/write watches.
+ void remove_fd_watch_nolock(int fd, int flags)
+ {
+ if (flags & IN_EVENTS) {
+ FD_CLR(fd, &read_set);
+ rd_udata[fd] = nullptr;
+ }
+ if (flags & OUT_EVENTS) {
+ FD_CLR(fd, &write_set);
+ wr_udata[fd] = nullptr;
+ }
+ }
+
+ void remove_fd_watch(int fd, int flags)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ remove_fd_watch_nolock(fd, flags);
+ }
+
+ void remove_bidi_fd_watch(int fd) noexcept
+ {
+ FD_CLR(fd, &read_set);
+ FD_CLR(fd, &write_set);
+ }
+
+ void enable_fd_watch_nolock(int fd, void *userdata, int flags)
+ {
+ if (flags & IN_EVENTS) {
+ FD_SET(fd, &read_set);
+ }
+ else {
+ FD_SET(fd, &write_set);
+ }
+ }
+
+ void enable_fd_watch(int fd, void *userdata, int flags)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ enable_fd_watch_nolock(fd, userdata, flags);
+ }
+
+ void disable_fd_watch_nolock(int fd, int flags)
+ {
+ if (flags & IN_EVENTS) {
+ FD_CLR(fd, &read_set);
+ }
+ else {
+ FD_CLR(fd, &write_set);
+ }
+ }
+
+ void disable_fd_watch(int fd, int flags)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ disable_fd_watch_nolock(fd, flags);
+ }
+
+ // If events are pending, process an unspecified number of them.
+ // If no events are pending, wait until one event is received and
+ // process this event (and possibly any other events received
+ // simultaneously).
+ // If processing an event removes a watch, there is a possibility
+ // that the watched event will still be reported (if it has
+ // occurred) before pull_events() returns.
+ //
+ // do_wait - if false, returns immediately if no events are
+ // pending.
+ void pull_events(bool do_wait) noexcept
+ {
+ struct timespec ts;
+ struct timespec *wait_ts = nullptr;
+
+ Base::lock.lock();
+
+ // Check whether any timers are pending, and what the next timeout is.
+ this->process_monotonic_timers(do_wait, ts, wait_ts);
+
+ fd_set read_set_c;
+ fd_set write_set_c;
+ fd_set err_set;
+
+ read_set_c = read_set;
+ write_set_c = write_set;
+ err_set = read_set;
+
+ const sigset_t &active_sigmask = this->get_active_sigmask();
+
+ sigset_t sigmask;
+ this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
+
+ // This is horrible, but hopefully will be optimised well. POSIX gives no way to combine signal
+ // sets other than this.
+ for (int i = 1; i < NSIG; i++) {
+ if (! sigismember(&active_sigmask, i)) {
+ sigdelset(&sigmask, i);
+ }
+ }
+ int nfds = max_fd + 1;
+ Base::lock.unlock();
+
+ // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
+ // received during polling, it will longjmp back to here:
+ if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
+ this->process_signal(sigmask);
+ do_wait = false;
+ }
+
+ if (! do_wait) {
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+ wait_ts = &ts;
+ }
+
+ std::atomic_signal_fence(std::memory_order_release);
+
+ int r = pselect(nfds, &read_set_c, &write_set_c, &err_set, wait_ts, &sigmask);
+
+ if (r == -1 || r == 0) {
+ // signal or no events
+ if (r == 0) {
+ if (! do_wait) {
+ // At least on Mac OS, pselect doesn't seem to give us a pending signal
+ // if we have a zero timeout. Force detection using sigmask:
+ sigset_t origmask;
+ this->sigmaskf(SIG_SETMASK, &sigmask, &origmask);
+ this->sigmaskf(SIG_SETMASK, &origmask, nullptr);
+ }
+
+ if (r == 0 && do_wait) {
+ // timeout:
+ Base::lock.lock();
+ this->process_monotonic_timers();
+ Base::lock.unlock();
+ }
+ }
+ return;
+ }
+
+ process_events(&read_set_c, &write_set_c, &err_set);
+ }
+};
+
+} // end namespace
+#ifndef DAYSNQ_SELECT_INCLUDED
+#define DASYNQ_SELECT_INDCLUDED 1
+
#include <system_error>
#include <vector>
#include <atomic>
constexpr static bool supports_non_oneshot_fd = false;
};
-template <class Base> class select_events : public signal_events<Base>
+template <class Base> class select_events : public signal_events<Base, true>
{
fd_set read_set;
fd_set write_set;
// pending.
void pull_events(bool do_wait) noexcept
{
- //using namespace dprivate::select_mech;
+ struct timeval ts;
+ struct timeval *wait_ts = nullptr;
+
+ Base::lock.lock();
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
+ // Check whether any timers are pending, and what the next timeout is.
+ // Check whether any timers are pending, and what the next timeout is.
+ this->process_monotonic_timers(do_wait, ts, wait_ts);
fd_set read_set_c;
fd_set write_set_c;
fd_set err_set;
- Base::lock.lock();
read_set_c = read_set;
write_set_c = write_set;
err_set = read_set;
const sigset_t &active_sigmask = this->get_active_sigmask();
- sigset_t sigmask;
- this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
- // This is horrible, but hopefully will be optimised well. POSIX gives no way to combine signal
- // sets other than this.
- for (int i = 1; i < NSIG; i++) {
- if (! sigismember(&active_sigmask, i)) {
- sigdelset(&sigmask, i);
- }
- }
int nfds = max_fd + 1;
Base::lock.unlock();
- volatile bool was_signalled = false;
-
// using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
// received during polling, it will longjmp back to here:
if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
- this->process_signal(sigmask);
- was_signalled = true;
+ this->process_signal();
+ do_wait = false;
}
- if (was_signalled) {
- do_wait = false;
+ if (! do_wait) {
+ ts.tv_sec = 0;
+ ts.tv_usec = 0;
+ wait_ts = &ts;
}
std::atomic_signal_fence(std::memory_order_release);
- int r = pselect(nfds, &read_set_c, &write_set_c, &err_set, do_wait ? nullptr : &ts, &sigmask);
+ this->sigmaskf(SIG_UNBLOCK, &active_sigmask, nullptr);
+ int r = select(nfds, &read_set_c, &write_set_c, &err_set, wait_ts);
+ this->sigmaskf(SIG_BLOCK, &active_sigmask, nullptr);
+
if (r == -1 || r == 0) {
// signal or no events
+ if (r == 0 && do_wait) {
+ // timeout:
+ Base::lock.lock();
+ this->process_monotonic_timers();
+ Base::lock.unlock();
+ }
+
return;
}
};
} // end namespace
+
+#endif
+#ifndef DASYNQ_SIGNAL_INCLUDED
+#define DASYNQ_SIGNAL_INCLUDED 1
+
#include <atomic>
#include <signal.h>
}
}
+ // Get the active signal mask - identifying the set of signals which have an enabled watcher.
+ // if mask_enables is true, the returned set contains the active signals; otherwise, it
+ // contains all inactive signals.
const sigset_t &get_active_sigmask()
{
return active_sigmask;
return dprivate::signal_mech::get_sigreceive_jmpbuf();
}
- void process_signal(sigset_t &sigmask)
+ // process a received signal
+ void process_signal()
{
using namespace dprivate::signal_mech;
std::atomic_signal_fence(std::memory_order_acquire);
void *udata = sig_userdata[sinfo->si_signo];
if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) {
if (mask_enables) {
- sigdelset(&sigmask, sinfo->si_signo);
sigdelset(&active_sigmask, sinfo->si_signo);
}
else {
+ sigaddset(&active_sigmask, sinfo->si_signo);
+ }
+ }
+ Base::lock.unlock();
+ }
+
+ // process a received signal, and update sigmask - which should reflect the inverse of the
+ // active signal mask.
+ void process_signal(sigset_t &sigmask)
+ {
+ using namespace dprivate::signal_mech;
+ std::atomic_signal_fence(std::memory_order_acquire);
+ auto * sinfo = get_siginfo();
+ sigdata_t sigdata;
+ sigdata.info = *sinfo;
+
+ Base::lock.lock();
+ void *udata = sig_userdata[sinfo->si_signo];
+ if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) {
+ if (mask_enables) {
sigaddset(&sigmask, sinfo->si_signo);
+ sigdelset(&active_sigmask, sinfo->si_signo);
+ }
+ else {
+ sigdelset(&sigmask, sinfo->si_signo);
sigaddset(&active_sigmask, sinfo->si_signo);
}
}
};
}
+
+#endif
#include <utility>
#include <mutex>
+#include <time.h>
+
#include "dasynq-daryheap.h"
namespace dasynq {
return timer_queue.empty() && mono_timer_queue.empty();
}
#else
+ // If there is no monotonic clock, map both clock_type::MONOTONIC and clock_type::SYSTEM to a
+ // single clock (based on gettimeofday).
protected:
inline timer_queue_t &queue_for_clock(clock_type clock)
{
#endif
// For the specified timer queue, issue expirations for all timers set to expire on or before the given
- // time (curtime). The timer queue must not be empty.
+ // time (curtime).
void process_timer_queue(timer_queue_t &queue, const struct timespec &curtime) noexcept
{
+ if (queue.empty()) return;
+
// Peek timer queue; calculate difference between current time and timeout
const time_val * timeout = &queue.get_root_priority();
time_val curtime_tv = curtime;
}
}
+ // Process timers based on the current clock time. If any timers have expired,
+ // set do_wait to false; otherwise, if any timers are pending, set ts to the delay before
+ // the next timer expires and set wait_ts to &ts.
+ // (If no timers are active, none of the output parameters are set).
+ inline void process_timers(clock_type clock, bool &do_wait, timespec &ts, timespec *&wait_ts)
+ {
+ timespec now;
+ auto &timer_q = this->queue_for_clock(clock);
+ this->get_time(now, clock, true);
+ if (! timer_q.empty()) {
+ const time_val &timeout = timer_q.get_root_priority();
+ if (timeout <= now) {
+ this->process_timer_queue(timer_q, now);
+ do_wait = false; // don't wait, we have events already
+ }
+ else if (do_wait) {
+ ts = (timeout - now);
+ wait_ts = &ts;
+ }
+ }
+ }
+
+ // Process timers based on the current clock time. If any timers have expired,
+ // set do_wait to false; otherwise, if any timers are pending, set tv to the delay before
+ // the next timer expires and set wait_tv to &tv.
+ // (If no timers are active, none of the output parameters are set).
+ inline void process_timers(clock_type clock, bool &do_wait, timeval &tv, timeval *&wait_tv)
+ {
+ timespec now;
+ auto &timer_q = this->queue_for_clock(clock);
+ this->get_time(now, clock, true);
+ if (! timer_q.empty()) {
+ const time_val &timeout = timer_q.get_root_priority();
+ if (timeout <= now) {
+ this->process_timer_queue(timer_q, now);
+ do_wait = false; // don't wait, we have events already
+ }
+ else if (do_wait) {
+ time_val delay = (timeout - now);
+ tv.tv_sec = delay.seconds();
+ tv.tv_usec = (delay.nseconds() + 999) / 1000;
+ wait_tv = &tv;
+ }
+ }
+ }
+
+ // Process monotonic timers based on the current clock time.
+ inline void process_monotonic_timers()
+ {
+ timespec now;
+ auto &timer_q = this->queue_for_clock(clock_type::MONOTONIC);
+ this->get_time(now, clock_type::MONOTONIC, true);
+ process_timer_queue(timer_q, now);
+ }
+
+ // Process monotonic timers based on the current clock time. If any timers have expired,
+ // set do_wait to false; otherwise, if any timers are pending, set ts to the delay before
+ // the next timer expires and set wait_ts to &ts.
+ // (If no timers are active, none of the output parameters are set).
+ inline void process_monotonic_timers(bool &do_wait, timespec &ts, timespec *&wait_ts)
+ {
+ process_timers(clock_type::MONOTONIC, do_wait, ts, wait_ts);
+ }
+
+ // Process monotonic timers based on the current clock time. If any timers have expired,
+ // set do_wait to false; otherwise, if any timers are pending, set ts to the delay before
+ // the next timer expires and set wait_ts to &ts.
+ // (If no timers are active, none of the output parameters are set).
+ inline void process_monotonic_timers(bool &do_wait, timeval &tv, timeval *&wait_tv)
+ {
+ process_timers(clock_type::MONOTONIC, do_wait, tv, wait_tv);
+ }
+
public:
void get_time(time_val &tv, clock_type clock, bool force_update) noexcept
#if _POSIX_TIMERS > 0
#include "dasynq-posixtimer.h"
namespace dasynq {
- template <typename T> using timer_events = posix_timer_events<T>;
+ template <typename T, bool provide_mono_timer = true> using timer_events = posix_timer_events<T, provide_mono_timer>;
}
#else
#include "dasynq-itimer.h"
namespace dasynq {
- template <typename T> using timer_events = itimer_events<T>;
+ template <typename T, bool provide_mono_timer = true> using timer_events = itimer_events<T, provide_mono_timer>;
}
#endif
#endif
#include "dasynq-kqueue-macos.h"
#include "dasynq-childproc.h"
namespace dasynq {
- template <typename T> using loop_t = macos_kqueue_loop<interrupt_channel<timer_events<child_proc_events<T>>>>;
+ template <typename T> using loop_t = macos_kqueue_loop<timer_events<child_proc_events<interrupt_channel<T>>, false>>;
using loop_traits_t = macos_kqueue_traits;
}
#else
#include "dasynq-kqueue.h"
#include "dasynq-childproc.h"
namespace dasynq {
- template <typename T> using loop_t = kqueue_loop<interrupt_channel<timer_events<child_proc_events<T>>>>;
+ template <typename T> using loop_t = kqueue_loop<timer_events<child_proc_events<interrupt_channel<T>>, false>>;
using loop_traits_t = kqueue_traits;
}
#endif
using loop_traits_t = epoll_traits;
}
#else
-#include "dasynq-select.h"
-#if _POSIX_TIMERS > 0
-#include "dasynq-posixtimer.h"
+#include "dasynq-childproc.h"
+#if DASYNQ_HAVE_PSELECT
+#include "dasynq-pselect.h"
namespace dasynq {
- template <typename T> using timer_events = posix_timer_events<T>;
+ template <typename T> using loop_t = pselect_events<timer_events<interrupt_channel<child_proc_events<T>>, false>>;
+ using loop_traits_t = select_traits;
}
#else
-#include "dasynq-itimer.h"
-namespace dasynq {
- template <typename T> using timer_events = itimer_events<T>;
-}
-#endif
-#include "dasynq-childproc.h"
+#include "dasynq-select.h"
namespace dasynq {
- template <typename T> using loop_t = select_events<interrupt_channel<timer_events<child_proc_events<T>>>>;
+ template <typename T> using loop_t = select_events<timer_events<interrupt_channel<child_proc_events<T>>, false>>;
using loop_traits_t = select_traits;
}
#endif
+#endif
#include <atomic>
#include <condition_variable>
template <typename T>
static timer<EventLoop> *add_timer(EventLoop &eloop, clock_type clock, bool relative,
- struct timespec &timeout, struct timespec &interval, T watch_hndlr)
+ const timespec &timeout, const timespec &interval, T watch_hndlr)
{
class lambda_timer : public timer_impl<event_loop_t, lambda_timer>
{