TIMER
};
- template <typename T_Mutex, typename Traits, typename LoopTraits> class event_dispatch;
+ template <typename Traits, typename LoopTraits> class event_dispatch;
// For FD watchers:
// Use this watch flag to indicate that in and out events should be reported separately,
};
// Base signal event - not part of public API
- template <typename T_Mutex, typename T_Sigdata>
+ template <typename T_Sigdata>
class base_signal_watcher : public base_watcher
{
- template <typename, typename, typename> friend class event_dispatch;
+ template <typename, typename> friend class event_dispatch;
template <typename, typename> friend class dasynq::event_loop;
protected:
typedef siginfo_t &siginfo_p;
};
- template <typename T_Mutex>
class base_fd_watcher : public base_watcher
{
- template <typename, typename, typename> friend class event_dispatch;
+ template <typename, typename> friend class event_dispatch;
template <typename, typename> friend class dasynq::event_loop;
protected:
base_fd_watcher() noexcept : base_watcher(watch_type_t::FD) { }
};
- template <typename T_Mutex>
- class base_bidi_fd_watcher : public base_fd_watcher<T_Mutex>
+ class base_bidi_fd_watcher : public base_fd_watcher
{
- template <typename, typename, typename> friend class event_dispatch;
+ template <typename, typename> friend class event_dispatch;
template <typename, typename> friend class dasynq::event_loop;
base_bidi_fd_watcher(const base_bidi_fd_watcher &) = delete;
int write_removed : 1; // write watch removed?
};
- template <typename T_Mutex>
class base_child_watcher : public base_watcher
{
- template <typename, typename, typename> friend class event_dispatch;
+ template <typename, typename> friend class event_dispatch;
template <typename, typename> friend class dasynq::event_loop;
protected:
};
- template <typename T_Mutex>
class base_timer_watcher : public base_watcher
{
- template <typename, typename, typename> friend class event_dispatch;
+ template <typename, typename> friend class event_dispatch;
template <typename, typename> friend class dasynq::event_loop;
protected:
+#include <sys/types.h>
+#include <sys/wait.h>
+
#include <signal.h>
+
#include "dasynq-btree_set.h"
namespace dasynq {
#if ! defined(DASYNQ_HAVE_KQUEUE)
#if defined(__OpenBSD__) || defined(__APPLE__) || defined(__FreeBSD__)
#define DASYNQ_HAVE_KQUEUE 1
+#if defined(__APPLE__)
+// kqueue on macos has "issues". See extra/macos-kqueue-bug. There is an alternate Dasyqn kqueue backend
+// which avoids the issue, which is enabled via DASYNQ_KQUEUE_MACOS_WORKAROUND.
+#define DASYNQ_KQUEUE_MACOS_WORKAROUND 1
+#endif
#endif
#endif
#include <vector>
#include <utility>
+#include <signal.h>
#include <sys/time.h>
#include <time.h>
--- /dev/null
+#include <system_error>
+#include <type_traits>
+#include <unordered_map>
+#include <vector>
+#include <tuple>
+
+#include <sys/event.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <sys/stat.h>
+
+#include <unistd.h>
+#include <signal.h>
+
+#include "dasynq-config.h"
+#include "dasynq-signal.h"
+
+// "kqueue"-based event loop mechanism.
+//
+// kqueue is available on BSDs and Mac OS X, though there are subtle differences from OS to OS.
+//
+// kqueue supports watching file descriptors (input and output as separate watches only),
+// signals, child processes, and timers. Unfortunately support for the latter two is imperfect;
+// it is not possible to reserve process watches in advance; timers can only be active, count
+// down immediately when created, and cannot be reset to another time. For timers especially
+// the problems are significant: we can't allocate timers in advance, and we can't even feasibly
+// manage our own timer queue via a single kqueue-backed timer. Therefore, an alternate timer
+// mechanism must be used together with kqueue.
+
+namespace dasynq {
+
+template <class Base> class kqueue_loop;
+
+class macos_kqueue_traits : public signal_traits
+{
+ template <class Base> friend class macos_kqueue_loop;
+
+ public:
+
+ class fd_r;
+
+ // File descriptor optional storage. If the mechanism can return the file descriptor, this
+ // class will be empty, otherwise it can hold a file descriptor.
+ class fd_s {
+ public:
+ fd_s(int) { }
+
+ DASYNQ_EMPTY_BODY
+ };
+
+ // File descriptor reference (passed to event callback). If the mechanism can return the
+ // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
+ // must be stored in an fd_s instance.
+ class fd_r {
+ int fd;
+ public:
+ int getFd(fd_s ss)
+ {
+ return fd;
+ }
+ fd_r(int nfd) : fd(nfd)
+ {
+ }
+ };
+
+ constexpr static bool has_bidi_fd_watch = false;
+ constexpr static bool has_separate_rw_fd_watches = true;
+ constexpr static bool interrupt_after_fd_add = false;
+ constexpr static bool supports_non_oneshot_fd = false;
+};
+
+#if _POSIX_REALTIME_SIGNALS > 0
+static inline void prepare_signal(int signo) { }
+static inline void unprep_signal(int signo) { }
+
+inline bool get_siginfo(int signo, siginfo_t *siginfo)
+{
+ struct timespec timeout;
+ timeout.tv_sec = 0;
+ timeout.tv_nsec = 0;
+
+ sigset_t mask;
+ sigemptyset(&mask);
+ sigaddset(&mask, signo);
+ return (sigtimedwait(&mask, siginfo, &timeout) != -1);
+}
+#else
+
+// If we have no sigtimedwait implementation, we have to retrieve signal data by establishing a
+// signal handler.
+
+// We need to declare and define a non-static data variable, "siginfo_p", in this header, without
+// violating the "one definition rule". The only way to do that is via a template, even though we
+// don't otherwise need a template here:
+template <typename T = decltype(nullptr)> class sig_capture_templ
+{
+ public:
+ static siginfo_t * siginfo_p;
+
+ static void signalHandler(int signo, siginfo_t *siginfo, void *v)
+ {
+ *siginfo_p = *siginfo;
+ }
+};
+template <typename T> siginfo_t * sig_capture_templ<T>::siginfo_p = nullptr;
+
+using sig_capture = sig_capture_templ<>;
+
+inline void prepare_signal(int signo)
+{
+ struct sigaction the_action;
+ the_action.sa_sigaction = sig_capture::signalHandler;
+ the_action.sa_flags = SA_SIGINFO;
+ sigfillset(&the_action.sa_mask);
+
+ sigaction(signo, &the_action, nullptr);
+}
+
+inline void unprep_signal(int signo)
+{
+ signal(signo, SIG_DFL);
+}
+
+inline bool get_siginfo(int signo, siginfo_t *siginfo)
+{
+ sig_capture::siginfo_p = siginfo;
+
+ sigset_t mask;
+ sigfillset(&mask);
+ sigdelset(&mask, signo);
+ sigsuspend(&mask);
+ return true;
+}
+
+#endif
+
+template <class Base> class macos_kqueue_loop : public signal_events<Base,true>
+{
+ int kqfd; // kqueue fd
+
+ // Base contains:
+ // lock - a lock that can be used to protect internal structure.
+ // receive*() methods will be called with lock held.
+ // receive_signal(sigdata_t &, user *) noexcept
+ // receive_fd_event(fd_r, user *, int flags) noexcept
+
+ using fd_r = typename macos_kqueue_traits::fd_r;
+
+ // The flag to specify poll() semantics for regular file readiness: that is, we want
+ // ready-for-read to be returned even at end of file:
+#if defined(NOTE_FILE_POLL)
+ // FreeBSD:
+ constexpr static int POLL_SEMANTICS = NOTE_FILE_POLL;
+#else
+ // Note that macOS has an "EV_POLL" defined that looks like it should give poll semantics
+ // when passed as a flag. However, it is filtered at the syscall entry so we cannot use it in
+ // kqueue. (The kernel uses it internally to implement poll()).
+ constexpr static int POLL_SEMANTICS = 0;
+#endif
+
+ void process_events(struct kevent *events, int r)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+ for (int i = 0; i < r; i++) {
+ if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) {
+ int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS;
+ auto r = Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags);
+ if (std::get<0>(r) == 0) {
+ // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for
+ // another connection).
+ events[i].flags = EV_DISABLE | EV_CLEAR;
+ }
+ else {
+ events[i].flags = EV_ENABLE;
+ }
+ }
+ else {
+ events[i].flags = EV_DISABLE;
+ }
+ }
+
+ // Now we disable all received events, to simulate EV_DISPATCH:
+ kevent(kqfd, events, r, nullptr, 0, nullptr);
+ }
+
+ public:
+
+ /**
+ * kqueue_loop constructor.
+ *
+ * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
+ */
+ macos_kqueue_loop()
+ {
+ kqfd = kqueue();
+ if (kqfd == -1) {
+ throw std::system_error(errno, std::system_category());
+ }
+ Base::init(this);
+ }
+
+ ~macos_kqueue_loop()
+ {
+ close(kqfd);
+ }
+
+ void set_filter_enabled(short filterType, uintptr_t ident, void *udata, bool enable)
+ {
+ // Note, on OpenBSD enabling or disabling filter will not alter the filter parameters (udata etc);
+ // on OS X however, it will. Therefore we set udata here (to the same value as it was originally
+ // set) in order to work correctly on both kernels.
+ struct kevent kev;
+ int fflags = (filterType == EVFILT_READ) ? POLL_SEMANTICS : 0;
+ EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, fflags, 0, udata);
+ kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
+ }
+
+ void remove_filter(short filterType, uintptr_t ident)
+ {
+ struct kevent kev;
+ EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0);
+ kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
+ }
+
+ // fd: file descriptor to watch
+ // userdata: data to associate with descriptor
+ // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT
+ // (only one of IN_EVENTS/OUT_EVENTS can be specified)
+ // soft_fail: true if unsupported file descriptors should fail by returning false instead
+ // of throwing an exception
+ // returns: true on success; false if file descriptor type isn't supported and emulate == true
+ // throws: std::system_error or std::bad_alloc on failure
+ bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool emulate = false)
+ {
+ short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE;
+
+ if (filter == EVFILT_READ && POLL_SEMANTICS == 0 && emulate) {
+ // We can't request poll semantics, so check for regular file:
+ struct stat statbuf;
+ if (fstat(fd, &statbuf) == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+ if ((statbuf.st_mode & S_IFMT) == S_IFREG) {
+ // Regular file: emulation required
+ return false;
+ }
+ }
+
+ int fflags = (filter == EVFILT_READ) ? POLL_SEMANTICS : 0;
+
+ struct kevent kev;
+ EV_SET(&kev, fd, filter, EV_ADD | (enabled ? 0 : EV_DISABLE), fflags, 0, userdata);
+ if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) {
+ // Note that kqueue supports EVFILT_READ on regular file fd's, but not EVFILT_WRITE.
+ if (filter == EVFILT_WRITE && errno == EINVAL && emulate) {
+ return false; // emulate
+ }
+ throw new std::system_error(errno, std::system_category());
+ }
+ return true;
+ }
+
+ // returns: 0 on success
+ // IN_EVENTS if in watch requires emulation
+ // OUT_EVENTS if out watch requires emulation
+ int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false)
+ {
+#ifdef EV_RECEIPT
+ struct kevent kev[2];
+ struct kevent kev_r[2];
+ short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
+ short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
+ EV_SET(&kev[0], fd, EVFILT_READ, rflags, POLL_SEMANTICS, 0, userdata);
+ EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
+
+ int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr);
+
+ if (r == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+
+ // Some possibilities:
+ // - both ends failed. We'll throw an error rather than allowing emulation.
+ // - read watch failed, write succeeded : should not happen.
+ // - read watch added, write failed: if emulate == true, succeed;
+ // if emulate == false, remove read and fail.
+
+ if (kev_r[0].data != 0) {
+ // read failed
+ throw new std::system_error(kev_r[0].data, std::system_category());
+ }
+
+ if (kev_r[1].data != 0) {
+ if (emulate) {
+ // We can emulate, but, do we have correct semantics?
+ if (POLL_SEMANTICS != 0) {
+ return OUT_EVENTS;
+ }
+
+ // if we can't get poll semantics, emulate for read as well:
+ // first remove read watch:
+ EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+ kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+ return IN_EVENTS | OUT_EVENTS;
+ }
+ // remove read watch
+ EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+ kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+ // throw exception
+ throw new std::system_error(kev_r[1].data, std::system_category());
+ }
+
+ return 0;
+#else
+ // OpenBSD doesn't have EV_RECEIPT: install the watches one at a time
+ struct kevent kev[1];
+
+ short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE);
+ short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE);
+ EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata);
+
+ int r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+
+ if (r == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+
+ EV_SET(&kev[0], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
+
+ r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+
+ if (r == -1) {
+ if (emulate) {
+ return OUT_EVENTS;
+ }
+ // remove read watch
+ EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+ kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+ // throw exception
+ throw new std::system_error(errno, std::system_category());
+ }
+
+ return 0;
+#endif
+ }
+
+ // flags specifies which watch to remove; ignored if the loop doesn't support
+ // separate read/write watches.
+ void remove_fd_watch(int fd, int flags)
+ {
+ remove_filter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd);
+ }
+
+ void remove_fd_watch_nolock(int fd, int flags)
+ {
+ remove_fd_watch(fd, flags);
+ }
+
+ void remove_bidi_fd_watch(int fd) noexcept
+ {
+ struct kevent kev[2];
+ EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
+ EV_SET(&kev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
+
+ kevent(kqfd, kev, 2, nullptr, 0, nullptr);
+ }
+
+ void enable_fd_watch(int fd, void *userdata, int flags)
+ {
+ set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, userdata, true);
+ }
+
+ void enable_fd_watch_nolock(int fd, void *userdata, int flags)
+ {
+ enable_fd_watch(fd, userdata, flags);
+ }
+
+ void disable_fd_watch(int fd, int flags)
+ {
+ set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, nullptr, false);
+ }
+
+ void disable_fd_watch_nolock(int fd, int flags)
+ {
+ disable_fd_watch(fd, flags);
+ }
+
+ // If events are pending, process an unspecified number of them.
+ // If no events are pending, wait until one event is received and
+ // process this event (and possibly any other events received
+ // simultaneously).
+ // If processing an event removes a watch, there is a possibility
+ // that the watched event will still be reported (if it has
+ // occurred) before pull_events() returns.
+ //
+ // do_wait - if false, returns immediately if no events are
+ // pending.
+ void pull_events(bool do_wait)
+ {
+ struct kevent events[16];
+ struct timespec ts;
+
+ Base::lock.lock();
+ sigset_t sigmask = this->get_active_sigmask();
+ Base::lock.unlock();
+
+ volatile bool was_signalled = false;
+
+ // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
+ // received during polling, it will longjmp back to here:
+ if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
+ this->process_signal(sigmask);
+ was_signalled = true;
+ }
+
+ if (was_signalled) {
+ do_wait = false;
+ }
+
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+
+ std::atomic_signal_fence(std::memory_order_release);
+ this->sigmaskf(SIG_UNBLOCK, &sigmask, nullptr);
+ int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
+ this->sigmaskf(SIG_BLOCK, &sigmask, nullptr);
+ if (r == -1 || r == 0) {
+ // signal or no events
+ return;
+ }
+
+ do {
+ process_events(events, r);
+ r = kevent(kqfd, nullptr, 0, events, 16, &ts);
+ } while (r > 0);
+ }
+};
+
+} // end namespace
#include <setjmp.h>
#include "dasynq-config.h"
+#include "dasynq-signal.h"
// "pselect"-based event loop mechanism.
//
template <class Base> class select_events;
-class select_traits
+class select_traits : public signal_traits
{
public:
- class sigdata_t
- {
- template <class Base> friend class select_events;
-
- siginfo_t info;
-
- public:
- // mandatory:
- int get_signo() { return info.si_signo; }
- int get_sicode() { return info.si_code; }
- pid_t get_sipid() { return info.si_pid; }
- uid_t get_siuid() { return info.si_uid; }
- void * get_siaddr() { return info.si_addr; }
- int get_sistatus() { return info.si_status; }
- int get_sival_int() { return info.si_value.sival_int; }
- void * get_sival_ptr() { return info.si_value.sival_ptr; }
-
- // XSI
- int get_sierrno() { return info.si_errno; }
-
- // XSR (streams) OB (obselete)
-#if !defined(__OpenBSD__)
- // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS
- // interface.
- int get_siband() { return info.si_band; }
-#endif
-
- void set_signo(int signo) { info.si_signo = signo; }
- };
-
class fd_r;
// File descriptor optional storage. If the mechanism can return the file descriptor, this
constexpr static bool supports_non_oneshot_fd = false;
};
-namespace dprivate {
-namespace select_mech {
-
-// We need to declare and define a non-static data variable, "siginfo_p", in this header, without
-// violating the "one definition rule". The only way to do that is via a template, even though we
-// don't otherwise need a template here:
-template <typename T = decltype(nullptr)> class sig_capture_templ
-{
- public:
- static siginfo_t siginfo_cap;
- static sigjmp_buf rjmpbuf;
-
- static void signal_handler(int signo, siginfo_t *siginfo, void *v)
- {
- siginfo_cap = *siginfo;
- siglongjmp(rjmpbuf, 1);
- }
-};
-template <typename T> siginfo_t sig_capture_templ<T>::siginfo_cap;
-template <typename T> sigjmp_buf sig_capture_templ<T>::rjmpbuf;
-
-using sig_capture = sig_capture_templ<>;
-
-inline void prepare_signal(int signo)
-{
- struct sigaction the_action;
- the_action.sa_sigaction = sig_capture::signal_handler;
- the_action.sa_flags = SA_SIGINFO;
- sigfillset(&the_action.sa_mask);
-
- sigaction(signo, &the_action, nullptr);
-}
-
-inline sigjmp_buf &get_sigreceive_jmpbuf()
-{
- return sig_capture::rjmpbuf;
-}
-
-inline void unprep_signal(int signo)
-{
- signal(signo, SIG_DFL);
-}
-
-inline siginfo_t * get_siginfo()
-{
- return &sig_capture::siginfo_cap;
-}
-
-} } // namespace dasynq :: select_mech
-
-template <class Base> class select_events : public Base
+template <class Base> class select_events : public signal_events<Base>
{
fd_set read_set;
fd_set write_set;
//fd_set error_set; // logical OR of both the above
int max_fd = 0; // highest fd in any of the sets
- sigset_t active_sigmask; // mask out unwatched signals i.e. active=0
- void * sig_userdata[NSIG];
-
// userdata pointers in read and write respectively, for each fd:
std::vector<void *> rd_udata;
std::vector<void *> wr_udata;
// receive_signal(sigdata_t &, user *) noexcept
// receive_fd_event(fd_r, user *, int flags) noexcept
- using sigdata_t = select_traits::sigdata_t;
using fd_r = typename select_traits::fd_r;
void process_events(fd_set *read_set_p, fd_set *write_set_p, fd_set *error_set_p)
{
FD_ZERO(&read_set);
FD_ZERO(&write_set);
- sigfillset(&active_sigmask);
Base::init(this);
}
disable_fd_watch_nolock(fd, flags);
}
- // Note signal should be masked before call.
- void add_signal_watch(int signo, void *userdata)
- {
- std::lock_guard<decltype(Base::lock)> guard(Base::lock);
- add_signal_watch_nolock(signo, userdata);
- }
-
- // Note signal should be masked before call.
- void add_signal_watch_nolock(int signo, void *userdata)
- {
- sig_userdata[signo] = userdata;
- sigdelset(&active_sigmask, signo);
- dprivate::select_mech::prepare_signal(signo);
- }
-
- // Note, called with lock held:
- void rearm_signal_watch_nolock(int signo, void *userdata) noexcept
- {
- sig_userdata[signo] = userdata;
- sigdelset(&active_sigmask, signo);
- }
-
- void remove_signal_watch_nolock(int signo) noexcept
- {
- dprivate::select_mech::unprep_signal(signo);
- sigaddset(&active_sigmask, signo);
- sig_userdata[signo] = nullptr;
- // No need to signal other threads
- }
-
- void remove_signal_watch(int signo) noexcept
- {
- std::lock_guard<decltype(Base::lock)> guard(Base::lock);
- remove_signal_watch_nolock(signo);
- }
-
- public:
-
// If events are pending, process an unspecified number of them.
// If no events are pending, wait until one event is received and
// process this event (and possibly any other events received
// pending.
void pull_events(bool do_wait) noexcept
{
- using namespace dprivate::select_mech;
+ //using namespace dprivate::select_mech;
struct timespec ts;
ts.tv_sec = 0;
write_set_c = write_set;
err_set = read_set;
+ const sigset_t &active_sigmask = this->get_active_sigmask();
+
sigset_t sigmask;
this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
// This is horrible, but hopefully will be optimised well. POSIX gives no way to combine signal
// using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
// received during polling, it will longjmp back to here:
- if (sigsetjmp(get_sigreceive_jmpbuf(), 1) != 0) {
- std::atomic_signal_fence(std::memory_order_acquire);
- auto * sinfo = get_siginfo();
- sigdata_t sigdata;
- sigdata.info = *sinfo;
- Base::lock.lock();
- void *udata = sig_userdata[sinfo->si_signo];
- if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) {
- sigaddset(&sigmask, sinfo->si_signo);
- sigaddset(&active_sigmask, sinfo->si_signo);
- }
- Base::lock.unlock();
+ if (sigsetjmp(this->get_sigreceive_jmpbuf(), 1) != 0) {
+ this->process_signal(sigmask);
was_signalled = true;
}
--- /dev/null
+#include <atomic>
+
+#include <signal.h>
+#include <setjmp.h>
+
+// Support for the standard POSIX signal mechanisms. This can be used by backends that don't
+// otherwise support receiving signals. It is not particularly nice (it involves using longjmp
+// out of a signal handler, which POSIX mildly frowns upon) but it's really the only viable way
+// to process signals together with file descriptor / other events and obtain the full siginfo_t
+// data passed to the signal handler.
+
+namespace dasynq {
+
+class signal_traits
+{
+ public:
+
+ class sigdata_t
+ {
+ template <typename, bool> friend class signal_events;
+
+ siginfo_t info;
+
+ public:
+ // mandatory:
+ int get_signo() { return info.si_signo; }
+ int get_sicode() { return info.si_code; }
+ pid_t get_sipid() { return info.si_pid; }
+ uid_t get_siuid() { return info.si_uid; }
+ void * get_siaddr() { return info.si_addr; }
+ int get_sistatus() { return info.si_status; }
+ int get_sival_int() { return info.si_value.sival_int; }
+ void * get_sival_ptr() { return info.si_value.sival_ptr; }
+
+ // XSI
+ int get_sierrno() { return info.si_errno; }
+
+ // XSR (streams) OB (obselete)
+#if !defined(__OpenBSD__)
+ // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS
+ // interface.
+ int get_siband() { return info.si_band; }
+#endif
+
+ void set_signo(int signo) { info.si_signo = signo; }
+ };
+
+ constexpr static bool interrupt_after_signal_add = true;
+};
+
+namespace dprivate {
+namespace signal_mech {
+
+// We need to declare and define a non-static data variable, "siginfo_p", in this header, without
+// violating the "one definition rule". The only way to do that is via a template, even though we
+// don't otherwise need a template here:
+template <typename T = decltype(nullptr)> class sig_capture_templ
+{
+ public:
+ static siginfo_t siginfo_cap;
+ static sigjmp_buf rjmpbuf;
+
+ static void signal_handler(int signo, siginfo_t *siginfo, void *v)
+ {
+ siginfo_cap = *siginfo;
+ siglongjmp(rjmpbuf, 1);
+ }
+};
+template <typename T> siginfo_t sig_capture_templ<T>::siginfo_cap;
+template <typename T> sigjmp_buf sig_capture_templ<T>::rjmpbuf;
+
+using sig_capture = sig_capture_templ<>;
+
+inline void prepare_signal(int signo)
+{
+ struct sigaction the_action;
+ the_action.sa_sigaction = sig_capture::signal_handler;
+ the_action.sa_flags = SA_SIGINFO;
+ sigfillset(&the_action.sa_mask);
+
+ sigaction(signo, &the_action, nullptr);
+}
+
+inline sigjmp_buf &get_sigreceive_jmpbuf()
+{
+ return sig_capture::rjmpbuf;
+}
+
+inline void unprep_signal(int signo)
+{
+ signal(signo, SIG_DFL);
+}
+
+inline siginfo_t * get_siginfo()
+{
+ return &sig_capture::siginfo_cap;
+}
+
+} } // namespace dasynq :: signal_mech
+
+// signal_events template.
+//
+// Active (watched and enabled) signals are maintained as a signal mask, which either has active
+// signals in the mask or inactive signals in the mask, depending on the mask_enables parameter.
+// (if mask_enables is true, active signals are in the mask). Which is more convenient depends
+// exactly on how the mask will be used.
+//
+template <class Base, bool mask_enables = false> class signal_events : public Base
+{
+ sigset_t active_sigmask; // mask out unwatched signals i.e. active=0
+ void * sig_userdata[NSIG];
+
+ using sigdata_t = signal_traits::sigdata_t;
+
+ protected:
+
+ signal_events()
+ {
+ if (mask_enables) {
+ sigemptyset(&active_sigmask);
+ }
+ else {
+ sigfillset(&active_sigmask);
+ }
+ }
+
+ const sigset_t &get_active_sigmask()
+ {
+ return active_sigmask;
+ }
+
+ sigjmp_buf &get_sigreceive_jmpbuf()
+ {
+ return dprivate::signal_mech::get_sigreceive_jmpbuf();
+ }
+
+ void process_signal(sigset_t &sigmask)
+ {
+ using namespace dprivate::signal_mech;
+ std::atomic_signal_fence(std::memory_order_acquire);
+ auto * sinfo = get_siginfo();
+ sigdata_t sigdata;
+ sigdata.info = *sinfo;
+
+ Base::lock.lock();
+ void *udata = sig_userdata[sinfo->si_signo];
+ if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) {
+ if (mask_enables) {
+ sigdelset(&sigmask, sinfo->si_signo);
+ sigdelset(&active_sigmask, sinfo->si_signo);
+ }
+ else {
+ sigaddset(&sigmask, sinfo->si_signo);
+ sigaddset(&active_sigmask, sinfo->si_signo);
+ }
+ }
+ Base::lock.unlock();
+ }
+
+ public:
+
+ // Note signal should be masked before call.
+ void add_signal_watch(int signo, void *userdata)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ add_signal_watch_nolock(signo, userdata);
+ }
+
+ // Note signal should be masked before call.
+ void add_signal_watch_nolock(int signo, void *userdata)
+ {
+ sig_userdata[signo] = userdata;
+ if (mask_enables) {
+ sigaddset(&active_sigmask, signo);
+ }
+ else {
+ sigdelset(&active_sigmask, signo);
+ }
+ dprivate::signal_mech::prepare_signal(signo);
+ }
+
+ // Note, called with lock held:
+ void rearm_signal_watch_nolock(int signo, void *userdata) noexcept
+ {
+ sig_userdata[signo] = userdata;
+ if (mask_enables) {
+ sigaddset(&active_sigmask, signo);
+ }
+ else {
+ sigdelset(&active_sigmask, signo);
+ }
+ }
+
+ void remove_signal_watch_nolock(int signo) noexcept
+ {
+ dprivate::signal_mech::unprep_signal(signo);
+ if (mask_enables) {
+ sigdelset(&active_sigmask, signo);
+ }
+ else {
+ sigaddset(&active_sigmask, signo);
+ }
+ sig_userdata[signo] = nullptr;
+ // No need to signal other threads
+ }
+
+ void remove_signal_watch(int signo) noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ remove_signal_watch_nolock(signo);
+ }
+
+};
+
+}
#include "dasynq-interrupt.h"
#include "dasynq-util.h"
-// Dasynq uses a "mix-in" pattern to produce an event loop implementation incorporating selectable implementations of
-// various components (main backend, timers, child process watch mechanism etc). In C++ this can be achieved by
-// a template for some component which extends its own type parameter:
+// Dasynq uses a "mix-in" pattern to produce an event loop implementation incorporating selectable
+// implementations of various components (main backend, timers, child process watch mechanism etc). In C++
+// this can be achieved by a template for some component which extends its own type parameter:
//
// template <typename Base> class X : public B { .... }
//
-// We can chain several such components together (and so so below) to "mix in" the functionality of each into the final
-// class, eg:
+// (Note that in a sense this is actually the opposite of the so-called "Curiously Recurring Template"
+// pattern, which can be used to achieve a similar goal). We can chain several such components together to
+// "mix in" the functionality of each into the final class, eg:
//
-// template <typename T> using Loop = epoll_loop<interrupt_channel<timer_fd_events<child_proc_events<T>>>>;
+// template <typename T> using loop_t =
+// epoll_loop<interrupt_channel<timer_fd_events<child_proc_events<T>>>>;
//
-// (which defines an alias template "Loop", whose implementation will use the epoll backend, a standard interrupt channel
-// implementation, a timerfd-based timer implementation, and the standard child process watch implementation).
-// We sometimes need the base class to be able to call derived-class members: to do this we pass a reference to
-// the derived instance into a templated method in the base, called "init":
+// (which defines an alias template "loop_t", whose implementation will use the epoll backend, a standard
+// interrupt channel implementation, a timerfd-based timer implementation, and the standard child process
+// watch implementation). We sometimes need the base class to be able to call derived-class members: to do
+// this we pass a reference to the derived instance into a template member function in the base, for example
+// the "init" function:
//
// template <typename T> void init(T *derived)
// {
// Base::init(derived);
// }
//
-// At the base all this is the event_dispatch class, defined below, which receives event notifications and inserts
-// them into a queue for processing. The event_loop class, also below, wraps this (via composition) in an interface
-// which can be used to register/de-regsiter/enable/disable event watchers, and which can process the queued events
-// by calling the watcher callbacks. The event_loop class also provides some synchronisation to ensure thread-safety.
-
-#if DASYNQ_HAVE_KQUEUE
-#include "dasynq-kqueue.h"
+// The 'loop_t' defined above is a template for a usable backend mechanism for the event_loop template
+// class. At the base all this is the event_dispatch class, defined below, which receives event
+// notifications and inserts them into a queue for processing. The event_loop class, also below, wraps this
+// (via composition) in an interface which can be used to register/de-register/enable/disable event
+// watchers, and which can process the queued events by calling the watcher callbacks. The event_loop class
+// also provides some synchronisation to ensure thread-safety, and abstracts away some differences between
+// backends.
+//
+// The differences are exposed as traits, partly via a separate traits class (loop_traits_t as defined
+// below, which contains the "main" traits, particularly the sigdata_t, fd_r and fd_s types). Note that the
+// event_dispatch class exposes the loop traits as traits_t, and these are then potentially augmented at
+// each stage of the mechanism inheritance chain (i.e. the final traits are exposed as
+// `loop_t<event_dispatch>::traits_t'.
+//
+// The trait members are:
+// sigdata_t - a wrapper for the siginfo_t type or equivalent used to pass signal parameters
+// fd_r - a file descriptor wrapper, if the backend is able to retrieve the file descriptor when
+// it receives an fd event. Not all backends can do this.
+// fd_s - a file descriptor storage wrapper. If the backend can retrieve file descriptors, this
+// will be empty (and ideally zero-size), otherwise it stores a file descriptor.
+// With an fd_r and fd_s instance you can always retrieve the file descriptor:
+// `fdr.get_fd(fds)' will return it.
+// has_bidi_fd_watch
+// - boolean indicating whether a single watch can support watching for both input and output
+// events simultaneously
+// has_separate_rw_fd_watches
+// - boolean indicating whether it is possible to add separate input and output watches for the
+// same fd. Either this or has_bidi_fd_watch must be true.
+// interrupt_after_fd_add
+// - boolean indicating if a loop interrupt must be forced after adding/enabling an fd watch.
+// interrupt_after_signal_add
+// - boolean indicating if a loop interrupt must be forced after adding or enabling a signal
+// watch.
+// supports_non_oneshot_fd
+// - boolean; if true, event_dispatch can arm an fd watch without ONESHOT and returning zero
+// events from receive_fd_event (the event notification function) will leave the descriptor
+// armed. If false, all fd watches are effectively ONESHOT (they can be re-armed immediately
+// after delivery by returning an appropriate event flag mask).
+// full_timer_support
+// - boolean indicating that the monotonic and system clocks are actually different clocks and
+// that timers against the system clock will work correctly if the system clock time is
+// adjusted. If false, the monotic clock may not be present at all (monotonic clock will map
+// to system clock), and timers against either clock are not guaranteed to work correctly if
+// the system clock is adjusted.
+
+#if DASYNQ_HAVE_EPOLL <= 0
#if _POSIX_TIMERS > 0
#include "dasynq-posixtimer.h"
namespace dasynq {
template <typename T> using timer_events = itimer_events<T>;
}
#endif
+#endif
+
+#if DASYNQ_HAVE_KQUEUE
+#if DASYNQ_KQUEUE_MACOS_WORKAROUND
+#include "dasynq-kqueue-macos.h"
+#include "dasynq-childproc.h"
+namespace dasynq {
+ template <typename T> using loop_t = macos_kqueue_loop<interrupt_channel<timer_events<child_proc_events<T>>>>;
+ using loop_traits_t = macos_kqueue_traits;
+}
+#else
+#include "dasynq-kqueue.h"
#include "dasynq-childproc.h"
namespace dasynq {
template <typename T> using loop_t = kqueue_loop<interrupt_channel<timer_events<child_proc_events<T>>>>;
using loop_traits_t = kqueue_traits;
}
+#endif
#elif DASYNQ_HAVE_EPOLL
#include "dasynq-epoll.h"
#include "dasynq-timerfd.h"
typedef std::condition_variable_any condvar;
};
+ // For a single-threaded loop, the waitqueue is a no-op:
template <> class waitqueue_node<null_mutex>
{
// Specialised waitqueue_node for null_mutex.
}
};
+ // friend of event_loop for giving access to various private members
+ class loop_access {
+ public:
+ template <typename Loop>
+ static typename Loop::mutex_t &get_base_lock(Loop &loop) noexcept
+ {
+ return loop.get_base_lock();
+ }
+
+ template <typename Loop>
+ static rearm process_fd_rearm(Loop &loop, typename Loop::base_fd_watcher *bfw,
+ rearm rearm_type, bool is_multi_watch) noexcept
+ {
+ return loop.process_fd_rearm(bfw, rearm_type, is_multi_watch);
+ }
+
+ template <typename Loop>
+ static rearm process_secondary_rearm(Loop &loop, typename Loop::base_bidi_fd_watcher * bdfw,
+ base_watcher * outw, rearm rearm_type) noexcept
+ {
+ return loop.process_secondary_rearm(bdfw, outw, rearm_type);
+ }
+
+ template <typename Loop>
+ static void process_signal_rearm(Loop &loop, typename Loop::base_signal_watcher * bsw,
+ rearm rearm_type) noexcept
+ {
+ loop.process_signal_rearm(bsw, rearm_type);
+ }
+
+ template <typename Loop>
+ static void process_child_watch_rearm(Loop &loop, typename Loop::base_child_watcher *bcw,
+ rearm rearm_type) noexcept
+ {
+ loop.process_child_watch_rearm(bcw, rearm_type);
+ }
+
+ template <typename Loop>
+ static void process_timer_rearm(Loop &loop, typename Loop::base_timer_watcher *btw,
+ rearm rearm_type) noexcept
+ {
+ loop.process_timer_rearm(btw, rearm_type);
+ }
+
+ template <typename Loop>
+ static void requeue_watcher(Loop &loop, base_watcher *watcher) noexcept
+ {
+ loop.requeue_watcher(watcher);
+ }
+ };
+
// Do standard post-dispatch processing for a watcher. This handles the case of removing or
- // re-queing watchers depending on the rearm type.
+ // re-queueing watchers depending on the rearm type.
template <typename Loop> void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearm_type)
{
if (rearm_type == rearm::REMOVE) {
- loop.get_base_lock().unlock();
+ loop_access::get_base_lock(loop).unlock();
watcher->watch_removed();
- loop.get_base_lock().lock();
+ loop_access::get_base_lock(loop).lock();
}
else if (rearm_type == rearm::REQUEUE) {
- loop.requeue_watcher(watcher);
+ loop_access::requeue_watcher(loop, watcher);
}
}
- // This class serves as the base class (mixin) for the AEN mechanism class.
+ // This class serves as the base class (mixin) for the backend mechanism.
//
// The event_dispatch class maintains the queued event data structures. It inserts watchers
// into the queue when events are received (receiveXXX methods). It also owns a mutex used
// In general the methods should be called with lock held. In practice this means that the
// event loop backend implementations must obtain the lock; they are also free to use it to
// protect their own internal data structures.
- template <typename T_Mutex, typename Traits, typename LoopTraits> class event_dispatch
+ template <typename Traits, typename LoopTraits> class event_dispatch
{
- friend class dasynq::event_loop<T_Mutex, LoopTraits>;;
+ friend class dasynq::event_loop<typename LoopTraits::mutex_t, LoopTraits>;;
public:
- using mutex_t = T_Mutex;
+ using mutex_t = typename LoopTraits::mutex_t;
using traits_t = Traits;
private:
// queue data structure/pointer
prio_queue event_queue;
- using base_signal_watcher = dasynq::dprivate::base_signal_watcher<mutex_t, typename traits_t::sigdata_t>;
- using base_fd_watcher = dasynq::dprivate::base_fd_watcher<mutex_t>;
- using base_bidi_fd_watcher = dasynq::dprivate::base_bidi_fd_watcher<mutex_t>;
- using base_child_watcher = dasynq::dprivate::base_child_watcher<mutex_t>;
- using base_timer_watcher = dasynq::dprivate::base_timer_watcher<mutex_t>;
+ using base_signal_watcher = dprivate::base_signal_watcher<typename traits_t::sigdata_t>;
+ using base_child_watcher = dprivate::base_child_watcher;
+ using base_timer_watcher = dprivate::base_timer_watcher;
// Add a watcher into the queueing system (but don't queue it). Call with lock held.
// may throw: std::bad_alloc
}
}
- // Queue a watcher for reomval, or issue "removed" callback to it.
+ // Queue a watcher for removal, or issue "removed" callback to it.
// Call with lock free.
void issue_delete(base_bidi_fd_watcher *watcher) noexcept
{
event_dispatch() { }
event_dispatch(const event_dispatch &) = delete;
};
+
}
-// This is the main event_loop implementation. It serves as an interface to the event loop
-// backend (of which it maintains an internal instance). It also serialises waits the backend
-// and provides safe deletion of watchers (see comments inline).
+// This is the main event_loop implementation. It serves as an interface to the event loop backend (of which
+// it maintains an internal instance). It also serialises polling the backend and provides safe deletion of
+// watchers (see comments inline).
+//
+// The T_Mutex type parameter specifies the mutex type. A null_mutex can be used for a single-threaded event
+// loop; std::mutex, or any mutex providing a compatible interface, can be used for a thread-safe event
+// loop.
+//
+// The Traits type parameter specifies any required traits for the event loop. This specifies the back-end
+// to use (backend_t, a template) and the basic back-end traits (backend_traits_t).
+// The default is `default_traits<T_Mutex>'.
+//
template <typename T_Mutex, typename Traits>
class event_loop
{
using my_event_loop_t = event_loop<T_Mutex, Traits>;
+
friend class dprivate::fd_watcher<my_event_loop_t>;
friend class dprivate::bidi_fd_watcher<my_event_loop_t>;
friend class dprivate::signal_watcher<my_event_loop_t>;
friend class dprivate::child_proc_watcher<my_event_loop_t>;
friend class dprivate::timer<my_event_loop_t>;
- friend void dprivate::post_dispatch<my_event_loop_t>(my_event_loop_t &loop,
- dprivate::base_watcher *watcher, rearm rearm_type);
-
- template <typename, typename> friend class dprivate::fd_watcher_impl;
- template <typename, typename> friend class dprivate::bidi_fd_watcher_impl;
- template <typename, typename> friend class dprivate::signal_watcher_impl;
- template <typename, typename> friend class dprivate::child_proc_watcher_impl;
- template <typename, typename> friend class dprivate::timer_impl;
+ friend class dprivate::loop_access;
using backend_traits_t = typename Traits::backend_traits_t;
- template <typename T, typename U> using event_dispatch = dprivate::event_dispatch<T,U,Traits>;
- using loop_mech_t = typename Traits::template backend_t<event_dispatch<T_Mutex, backend_traits_t>>;
+ template <typename T> using event_dispatch = dprivate::event_dispatch<T,Traits>;
+ using dispatch_t = event_dispatch<backend_traits_t>;
+ using loop_mech_t = typename Traits::template backend_t<dispatch_t>;
using reaper_mutex_t = typename loop_mech_t::reaper_mutex_t;
public:
template <typename T> using waitqueue = dprivate::waitqueue<T>;
template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
using base_watcher = dprivate::base_watcher;
- using base_signal_watcher = dprivate::base_signal_watcher<T_Mutex, typename loop_traits_t::sigdata_t>;
- using base_fd_watcher = dprivate::base_fd_watcher<T_Mutex>;
- using base_bidi_fd_watcher = dprivate::base_bidi_fd_watcher<T_Mutex>;
- using base_child_watcher = dprivate::base_child_watcher<T_Mutex>;
- using base_timer_watcher = dprivate::base_timer_watcher<T_Mutex>;
+ using base_signal_watcher = dprivate::base_signal_watcher<typename loop_traits_t::sigdata_t>;
+ using base_fd_watcher = dprivate::base_fd_watcher;
+ using base_bidi_fd_watcher = dprivate::base_bidi_fd_watcher;
+ using base_child_watcher = dprivate::base_child_watcher;
+ using base_timer_watcher = dprivate::base_timer_watcher;
using watch_type_t = dprivate::watch_type_t;
loop_mech_t loop_mech;
void register_signal(base_signal_watcher *callBack, int signo)
{
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- std::lock_guard<mutex_t> guard(ed.lock);
+ std::lock_guard<mutex_t> guard(loop_mech.lock);
loop_mech.prepare_watcher(callBack);
try {
waitqueue_node<T_Mutex> qnode;
get_attn_lock(qnode);
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- ed.issue_delete(callBack);
+ loop_mech.issue_delete(callBack);
release_lock(qnode);
}
void register_fd(base_fd_watcher *callback, int fd, int eventmask, bool enabled, bool emulate = false)
{
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- std::lock_guard<mutex_t> guard(ed.lock);
+ std::lock_guard<mutex_t> guard(loop_mech.lock);
loop_mech.prepare_watcher(callback);
}
}
+ // Register a bidi fd watcher. The watch_flags should already be set to the eventmask to watch
+ // (i.e. eventmask == callback->watch_flags is a pre-condition).
void register_fd(base_bidi_fd_watcher *callback, int fd, int eventmask, bool emulate = false)
{
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- std::lock_guard<mutex_t> guard(ed.lock);
+ std::lock_guard<mutex_t> guard(loop_mech.lock);
loop_mech.prepare_watcher(callback);
try {
if (r & IN_EVENTS) {
callback->emulatefd = true;
if (eventmask & IN_EVENTS) {
+ callback->watch_flags &= ~IN_EVENTS;
requeue_watcher(callback);
}
}
if (r & OUT_EVENTS) {
callback->out_watcher.emulatefd = true;
if (eventmask & OUT_EVENTS) {
+ callback->watch_flags &= ~OUT_EVENTS;
requeue_watcher(&callback->out_watcher);
}
}
callback->emulatefd = true;
callback->out_watcher.emulatefd = true;
if (eventmask & IN_EVENTS) {
+ callback->watch_flags &= ~IN_EVENTS;
requeue_watcher(callback);
}
if (eventmask & OUT_EVENTS) {
+ callback->watch_flags &= ~OUT_EVENTS;
requeue_watcher(&callback->out_watcher);
}
}
void deregister(base_fd_watcher *callback, int fd) noexcept
{
if (callback->emulatefd) {
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
+ auto & ed = (dispatch_t &) loop_mech;
ed.issue_delete(callback);
return;
}
waitqueue_node<T_Mutex> qnode;
get_attn_lock(qnode);
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
+ auto & ed = (dispatch_t &) loop_mech;
ed.issue_delete(callback);
release_lock(qnode);
waitqueue_node<T_Mutex> qnode;
get_attn_lock(qnode);
- event_dispatch<T_Mutex, backend_traits_t> & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
+ dispatch_t & ed = (dispatch_t &) loop_mech;
ed.issue_delete(callback);
release_lock(qnode);
void reserve_child_watch(base_child_watcher *callback)
{
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- std::lock_guard<mutex_t> guard(ed.lock);
+ std::lock_guard<mutex_t> guard(loop_mech.lock);
loop_mech.prepare_watcher(callback);
try {
void unreserve(base_child_watcher *callback) noexcept
{
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- std::lock_guard<mutex_t> guard(ed.lock);
+ std::lock_guard<mutex_t> guard(loop_mech.lock);
loop_mech.unreserve_child_watch(callback->watch_handle);
loop_mech.release_watcher(callback);
void register_child(base_child_watcher *callback, pid_t child)
{
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- std::lock_guard<mutex_t> guard(ed.lock);
+ std::lock_guard<mutex_t> guard(loop_mech.lock);
loop_mech.prepare_watcher(callback);
try {
waitqueue_node<T_Mutex> qnode;
get_attn_lock(qnode);
- event_dispatch<T_Mutex, backend_traits_t> & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- ed.issue_delete(callback);
+ loop_mech.issue_delete(callback);
release_lock(qnode);
}
void register_timer(base_timer_watcher *callback, clock_type clock)
{
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- std::lock_guard<mutex_t> guard(ed.lock);
+ std::lock_guard<mutex_t> guard(loop_mech.lock);
loop_mech.prepare_watcher(callback);
try {
waitqueue_node<T_Mutex> qnode;
get_attn_lock(qnode);
- event_dispatch<T_Mutex, backend_traits_t> & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- ed.issue_delete(callback);
+ loop_mech.issue_delete(callback);
release_lock(qnode);
}
void requeue_watcher(base_watcher *watcher) noexcept
{
loop_mech.queue_watcher(watcher);
-
- // We need to signal any thread that is currently waiting on the loop mechanism, so that it wakes
- // and processes the newly queued watcher:
-
- wait_lock.lock();
- bool attn_q_empty = attn_waitqueue.is_empty();
- wait_lock.unlock();
-
- if (! attn_q_empty) {
- loop_mech.interrupt_wait();
- }
+ interrupt_if_necessary();
}
// Interrupt the current poll-waiter, if necessary - that is, if the loop is multi-thread safe, and if
// there is currently another thread polling the backend event mechanism.
void interrupt_if_necessary()
{
- std::lock_guard<mutex_t> guard(wait_lock);
- if (! attn_waitqueue.is_empty()) { // (always false for single-threaded loops)
+ wait_lock.lock();
+ bool attn_q_empty = attn_waitqueue.is_empty(); // (always false for single-threaded loops)
+ wait_lock.unlock();
+
+ if (! attn_q_empty) {
loop_mech.interrupt_wait();
}
}
}
}
else if (rearm_type == rearm::REARM) {
- bdfw->watch_flags |= IN_EVENTS;
-
if (! emulatedfd) {
+ bdfw->watch_flags |= IN_EVENTS;
if (! backend_traits_t::has_separate_rw_fd_watches) {
int watch_flags = bdfw->watch_flags;
set_fd_enabled_nolock(bdfw, bdfw->watch_fd,
}
}
else {
+ bdfw->watch_flags &= ~IN_EVENTS;
rearm_type = rearm::REQUEUE;
}
}
else if (rearm_type == rearm::NOOP) {
if (bdfw->emulatefd) {
if (bdfw->watch_flags & IN_EVENTS) {
+ bdfw->watch_flags &= ~IN_EVENTS;
rearm_type = rearm::REQUEUE;
}
}
bdfw->watch_flags &= ~OUT_EVENTS;
}
else if (rearm_type == rearm::REARM) {
- bdfw->watch_flags |= OUT_EVENTS;
+ bdfw->watch_flags &= ~OUT_EVENTS;
rearm_type = rearm::REQUEUE;
}
else if (rearm_type == rearm::NOOP) {
if (bdfw->watch_flags & OUT_EVENTS) {
+ bdfw->watch_flags &= ~OUT_EVENTS;
rearm_type = rearm::REQUEUE;
}
}
// no limit.
bool process_events(int limit) noexcept
{
- auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
- ed.lock.lock();
+ loop_mech.lock.lock();
if (limit == 0) {
return false;
}
- base_watcher * pqueue = ed.pull_event();
+ base_watcher * pqueue = loop_mech.pull_event();
bool active = false;
while (pqueue != nullptr) {
// issue a secondary dispatch:
bbfw->dispatch_second(this);
- pqueue = ed.pull_event();
+ pqueue = loop_mech.pull_event();
continue;
}
limit--;
if (limit == 0) break;
}
- pqueue = ed.pull_event();
+ pqueue = loop_mech.pull_event();
}
- ed.lock.unlock();
+ loop_mech.lock.unlock();
return active;
}
// Posix signal event watcher
template <typename EventLoop>
-class signal_watcher : private dprivate::base_signal_watcher<typename EventLoop::mutex_t, typename EventLoop::loop_traits_t::sigdata_t>
+class signal_watcher : private dprivate::base_signal_watcher<typename EventLoop::loop_traits_t::sigdata_t>
{
template <typename, typename> friend class signal_watcher_impl;
void dispatch(void *loop_ptr) noexcept override
{
EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
- loop.get_base_lock().unlock();
+ loop_access::get_base_lock(loop).unlock();
auto rearm_type = static_cast<Derived *>(this)->received(loop, this->siginfo.get_signo(), this->siginfo);
- loop.get_base_lock().lock();
+ loop_access::get_base_lock(loop).lock();
if (rearm_type != rearm::REMOVED) {
rearm_type = rearm::REMOVE;
}
- loop.process_signal_rearm(this, rearm_type);
+ loop_access::process_signal_rearm(loop, this, rearm_type);
post_dispatch(loop, this, rearm_type);
}
// Posix file descriptor event watcher
template <typename EventLoop>
-class fd_watcher : private dprivate::base_fd_watcher<typename EventLoop::mutex_t>
+class fd_watcher : private dprivate::base_fd_watcher
{
template <typename, typename> friend class fd_watcher_impl;
{
std::lock_guard<mutex_t> guard(eloop.get_base_lock());
if (this->emulatefd) {
+ if (enable && ! this->emulate_enabled) {
+ loop_access::requeue_watcher(eloop, this);
+ }
this->emulate_enabled = enable;
}
else {
eloop.set_fd_enabled_nolock(this, this->watch_fd, this->watch_flags, enable);
}
+
if (! enable) {
eloop.dequeue_watcher(this);
}
// In case emulating, clear enabled here; REARM or explicit set_enabled will re-enable.
this->emulate_enabled = false;
- loop.get_base_lock().unlock();
+ loop_access::get_base_lock(loop).unlock();
auto rearm_type = static_cast<Derived *>(this)->fd_event(loop, this->watch_fd, this->event_flags);
- loop.get_base_lock().lock();
+ loop_access::get_base_lock(loop).lock();
if (rearm_type != rearm::REMOVED) {
this->event_flags = 0;
rearm_type = rearm::REMOVE;
}
- rearm_type = loop.process_fd_rearm(this, rearm_type, false);
+ rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, false);
post_dispatch(loop, this, rearm_type);
}
// This watcher type has two event notification methods which can both potentially be
// active at the same time.
template <typename EventLoop>
-class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher<typename EventLoop::mutex_t>
+class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher
{
template <typename, typename> friend class bidi_fd_watcher_impl;
void set_watch_enabled(EventLoop &eloop, bool in, bool b)
{
int events = in ? IN_EVENTS : OUT_EVENTS;
+ auto orig_flags = this->watch_flags;
if (b) {
this->watch_flags |= events;
(this->watch_flags & IO_EVENTS) != 0);
}
}
+ else {
+ // emulation: if enabling a previously disabled watcher, must queue now:
+ if (b && (orig_flags != this->watch_flags)) {
+ this->watch_flags = orig_flags;
+ loop_access::requeue_watcher(eloop, watcher);
+ }
+ }
if (! b) {
eloop.dequeue_watcher(watcher);
{
EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
this->emulate_enabled = false;
- loop.get_base_lock().unlock();
+ loop_access::get_base_lock(loop).unlock();
auto rearm_type = static_cast<Derived *>(this)->read_ready(loop, this->watch_fd);
- loop.get_base_lock().lock();
+ loop_access::get_base_lock(loop).lock();
if (rearm_type != rearm::REMOVED) {
this->event_flags &= ~IN_EVENTS;
rearm_type = rearm::REMOVE;
}
- rearm_type = loop.process_fd_rearm(this, rearm_type, true);
+ rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, true);
post_dispatch(loop, this, rearm_type);
}
auto &outwatcher = bidi_fd_watcher<EventLoop>::out_watcher;
EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
- loop.get_base_lock().unlock();
+ loop_access::get_base_lock(loop).unlock();
auto rearm_type = static_cast<Derived *>(this)->write_ready(loop, this->watch_fd);
- loop.get_base_lock().lock();
+ loop_access::get_base_lock(loop).lock();
if (rearm_type != rearm::REMOVED) {
this->event_flags &= ~OUT_EVENTS;
rearm_type = rearm::REMOVE;
}
- rearm_type = loop.process_secondary_rearm(this, &outwatcher, rearm_type);
+ rearm_type = loop_access::process_secondary_rearm(loop, this, &outwatcher, rearm_type);
if (rearm_type == rearm::REQUEUE) {
post_dispatch(loop, &outwatcher, rearm_type);
// Child process event watcher
template <typename EventLoop>
-class child_proc_watcher : private dprivate::base_child_watcher<typename EventLoop::mutex_t>
+class child_proc_watcher : private dprivate::base_child_watcher
{
template <typename, typename> friend class child_proc_watcher_impl;
void dispatch(void *loop_ptr) noexcept override
{
EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
- loop.get_base_lock().unlock();
+ loop_access::get_base_lock(loop).unlock();
auto rearm_type = static_cast<Derived *>(this)->status_change(loop, this->watch_pid, this->child_status);
- loop.get_base_lock().lock();
+ loop_access::get_base_lock(loop).lock();
if (rearm_type != rearm::REMOVED) {
rearm_type = rearm::REMOVE;
}
- loop.process_child_watch_rearm(this, rearm_type);
+ loop_access::process_child_watch_rearm(loop, this, rearm_type);
// rearm_type = loop.process??;
post_dispatch(loop, this, rearm_type);
};
template <typename EventLoop>
-class timer : private base_timer_watcher<typename EventLoop::mutex_t>
+class timer : private base_timer_watcher
{
template <typename, typename> friend class timer_impl;
- using base_t = base_timer_watcher<typename EventLoop::mutex_t>;
+ using base_t = base_timer_watcher;
using mutex_t = typename EventLoop::mutex_t;
public:
void dispatch(void *loop_ptr) noexcept override
{
EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
- loop.get_base_lock().unlock();
+ loop_access::get_base_lock(loop).unlock();
auto intervals_report = this->intervals;
this->intervals = 0;
auto rearm_type = static_cast<Derived *>(this)->timer_expiry(loop, intervals_report);
- loop.get_base_lock().lock();
+ loop_access::get_base_lock(loop).lock();
if (rearm_type != rearm::REMOVED) {
rearm_type = rearm::REMOVE;
}
- loop.process_timer_rearm(this, rearm_type);
+ loop_access::process_timer_rearm(loop, this, rearm_type);
post_dispatch(loop, this, rearm_type);
}