From af95565cd113fc44b40667ce328596eba4bfbfaa Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Tue, 12 Dec 2017 05:45:12 +1100 Subject: [PATCH] Incorporate upstream changes from Dasynq. This updates bundled Dasynq to 1.0 (plus minor changes since). --- src/dasynq/dasynq-basewatchers.h | 226 ++++++ src/dasynq/dasynq-btree_set.h | 5 +- src/dasynq/dasynq-childproc.h | 71 +- src/dasynq/dasynq-config.h | 51 +- src/dasynq/dasynq-epoll.h | 120 +-- src/dasynq/dasynq-interrupt.h | 23 +- src/dasynq/dasynq-itimer.h | 188 ++--- src/dasynq/dasynq-kqueue.h | 348 +++++---- src/dasynq/dasynq-mutex.h | 39 +- src/dasynq/dasynq-naryheap.h | 17 +- src/dasynq/dasynq-posixtimer.h | 115 +-- src/dasynq/dasynq-svec.h | 184 ----- src/dasynq/dasynq-timerbase.h | 277 +++++-- src/dasynq/dasynq-timerfd.h | 140 +--- src/dasynq/dasynq-util.h | 41 + src/dasynq/dasynq.h | 1195 ++++++++++++++---------------- src/service.cc | 2 +- 17 files changed, 1549 insertions(+), 1493 deletions(-) create mode 100644 src/dasynq/dasynq-basewatchers.h delete mode 100644 src/dasynq/dasynq-svec.h create mode 100644 src/dasynq/dasynq-util.h diff --git a/src/dasynq/dasynq-basewatchers.h b/src/dasynq/dasynq-basewatchers.h new file mode 100644 index 0000000..cbabe91 --- /dev/null +++ b/src/dasynq/dasynq-basewatchers.h @@ -0,0 +1,226 @@ +// Dasynq: early declarations and base watchers. +// +// Here we define watcher functionality that is not dependent on the event loop type. In particular, +// base classes for the various watcher types. These are not part of the public API. +// +// In general access to the members of the basewatcher should be protected by a mutex. The +// event_dispatch lock is used for this purpose. + +namespace dasynq { + +namespace dprivate { + // POSIX says that sigprocmask has unspecified behaviour if used in a multi-threaded process. We can use + // pthread_sigmask instead, but that may require linking with the threads library. This function is + // specialised to call one or the other depending on the mutex type: + template void sigmaskf(int how, const sigset_t *set, sigset_t *oset) + { + sigprocmask(how, set, oset); + } + + template <> inline void sigmaskf(int how, const sigset_t *set, sigset_t *oset) + { + pthread_sigmask(how, set, oset); + } +} + +// A template to generate suitable default loop traits for a given type of mutex: +template class default_traits +{ + public: + using mutex_t = T_Mutex; + template using backend_t = dasynq::loop_t; + using backend_traits_t = dasynq::loop_traits_t; + + // Alter the current thread signal mask using the correct function + // (sigprocmask or pthread_sigmask): + static void sigmaskf(int how, const sigset_t *set, sigset_t *oset) + { + dprivate::sigmaskf(how, set, oset); + } +}; + +// Forward declarations: +template > +class event_loop; + +inline namespace { + constexpr int DEFAULT_PRIORITY = 50; +} + +namespace dprivate { + // (non-public API) + + class base_watcher; + using prio_queue = NaryHeap; + + template class fd_watcher; + template class bidi_fd_watcher; + template class signal_watcher; + template class child_proc_watcher; + template class timer; + + template class fd_watcher_impl; + template class bidi_fd_watcher_impl; + template class signal_watcher_impl; + template class child_proc_watcher_impl; + template class timer_impl; + + enum class watch_type_t + { + SIGNAL, + FD, + CHILD, + SECONDARYFD, + TIMER + }; + + template class event_dispatch; + + // For FD watchers: + // Use this watch flag to indicate that in and out events should be reported separately, + // that is, watcher should not be disabled until all watched event types are queued. + constexpr static int multi_watch = 4; + + // Represents a queued event notification. Various event watchers derive from this type. + class base_watcher + { + public: + watch_type_t watchType; + int active : 1; // currently executing handler? + int deleteme : 1; // delete when handler finished? + int emulatefd : 1; // emulate file watch (by re-queueing) + int emulate_enabled : 1; // whether an emulated watch is enabled + int child_termd : 1; // child process has terminated + + prio_queue::handle_t heap_handle; + int priority; + + static void set_priority(base_watcher &p, int prio) + { + p.priority = prio; + } + + // Perform initialisation necessary before registration with an event loop + void init() + { + active = false; + deleteme = false; + emulatefd = false; + emulate_enabled = false; + child_termd = false; + prio_queue::init_handle(heap_handle); + priority = DEFAULT_PRIORITY; + } + + base_watcher(watch_type_t wt) noexcept : watchType(wt) { } + base_watcher(const base_watcher &) = delete; + base_watcher &operator=(const base_watcher &) = delete; + + // The dispatch function is called to process a watcher's callback. It is the "real" callback + // function; it usually delegates to a user-provided callback. + virtual void dispatch(void *loop_ptr) noexcept { }; + + // Bi-directional file descriptor watches have a secondary dispatch function for the secondary + // watcher (i.e. the output watcher): + virtual void dispatch_second(void *loop_ptr) noexcept { } + + virtual ~base_watcher() noexcept { } + + // Called when the watcher has been removed. + // It is guaranteed by the caller that: + // - the dispatch method is not currently running + // - the dispatch method will not be called. + virtual void watch_removed() noexcept + { + // Later: the "delete" behaviour could be dependent on a flag, perhaps? + // delete this; + } + }; + + // Base signal event - not part of public API + template + class base_signal_watcher : public base_watcher + { + template friend class event_dispatch; + template friend class dasynq::event_loop; + + protected: + T_Sigdata siginfo; + base_signal_watcher() : base_watcher(watch_type_t::SIGNAL) { } + + public: + using siginfo_t = T_Sigdata; + typedef siginfo_t &siginfo_p; + }; + + template + class base_fd_watcher : public base_watcher + { + template friend class event_dispatch; + template friend class dasynq::event_loop; + + protected: + int watch_fd; + + // These flags are protected by the loop's internal lock: + int watch_flags; // events being watched + int event_flags; // events pending (queued) + + // watch_flags: for a regular fd_watcher, this specifies the events that the watcher + // is watching (or was watching if disabled). For a bidi_fd_watcher, specifies + // the events that the watcher is currently watching (i.e. specifies which + // halves of the Bidi watcher are enabled). + + base_fd_watcher() noexcept : base_watcher(watch_type_t::FD) { } + }; + + template + class base_bidi_fd_watcher : public base_fd_watcher + { + template friend class event_dispatch; + template friend class dasynq::event_loop; + + protected: + + // The main instance is the "input" watcher only; we keep a secondary watcher with a secondary set + // of flags for the "output" watcher. Note that some of the flags in the secondary watcher aren't + // used; it exists mainly so that it can be queued independently of the primary watcher. + base_watcher out_watcher {watch_type_t::SECONDARYFD}; + + int read_removed : 1; // read watch removed? + int write_removed : 1; // write watch removed? + }; + + template + class base_child_watcher : public base_watcher + { + template friend class event_dispatch; + template friend class dasynq::event_loop; + + protected: + pid_watch_handle_t watch_handle; + pid_t watch_pid; + int child_status; + + base_child_watcher() : base_watcher(watch_type_t::CHILD) { } + }; + + + template + class base_timer_watcher : public base_watcher + { + template friend class event_dispatch; + template friend class dasynq::event_loop; + + protected: + timer_handle_t timer_handle; + int intervals; + clock_type clock; + + base_timer_watcher() : base_watcher(watch_type_t::TIMER) + { + init_timer_handle(timer_handle); + } + }; +} // dprivate +} // dasynq diff --git a/src/dasynq/dasynq-btree_set.h b/src/dasynq/dasynq-btree_set.h index 80885e9..2e2c090 100644 --- a/src/dasynq/dasynq-btree_set.h +++ b/src/dasynq/dasynq-btree_set.h @@ -1,7 +1,6 @@ #ifndef DASYNQ_BTREE_SET_H #define DASYNQ_BTREE_SET_H -#include #include namespace dasynq { @@ -577,6 +576,10 @@ class btree_set ~btree_set() { + while (left_sept != nullptr) { + remove(*(left_sept->hn_p[0])); + } + while (sn_reserve != nullptr) { auto *next = sn_reserve->parent; delete sn_reserve; diff --git a/src/dasynq/dasynq-childproc.h b/src/dasynq/dasynq-childproc.h index 8d364f7..f3f5e7f 100644 --- a/src/dasynq/dasynq-childproc.h +++ b/src/dasynq/dasynq-childproc.h @@ -3,6 +3,8 @@ namespace dasynq { +namespace dprivate { + // Map of pid_t to void *, with possibility of reserving entries so that mappings can // be later added with no danger of allocator exhaustion (bad_alloc). class pid_map @@ -76,28 +78,42 @@ inline void sigchld_handler(int signum) // hurt in any case). } -using pid_watch_handle_t = pid_map::pid_handle_t; +} // dprivate namespace + +using pid_watch_handle_t = dprivate::pid_map::pid_handle_t; -template class ChildProcEvents : public Base +template class child_proc_events : public Base { + public: + using reaper_mutex_t = typename Base::mutex_t; + + class traits_t : public Base::traits_t + { + public: + constexpr static bool supports_childwatch_reservation = true; + }; + private: - pid_map child_waiters; + dprivate::pid_map child_waiters; + reaper_mutex_t reaper_lock; // used to prevent reaping while trying to signal a process protected: - using SigInfo = typename Base::SigInfo; + using sigdata_t = typename traits_t::sigdata_t; template - bool receive_signal(T & loop_mech, SigInfo &siginfo, void *userdata) + bool receive_signal(T & loop_mech, sigdata_t &siginfo, void *userdata) { if (siginfo.get_signo() == SIGCHLD) { int status; pid_t child; + reaper_lock.lock(); while ((child = waitpid(-1, &status, WNOHANG)) > 0) { - pid_map::entry ent = child_waiters.remove(child); + auto ent = child_waiters.remove(child); if (ent.first) { - Base::receiveChildStat(child, status, ent.second); + Base::receive_child_stat(child, status, ent.second); } } + reaper_lock.unlock(); return false; // leave signal watch enabled } else { @@ -106,36 +122,34 @@ template class ChildProcEvents : public Base } public: - void reserveChildWatch(pid_watch_handle_t &handle) + void reserve_child_watch_nolock(pid_watch_handle_t &handle) { - std::lock_guard guard(Base::lock); child_waiters.reserve(handle); } - void unreserveChildWatch(pid_watch_handle_t &handle) noexcept + void unreserve_child_watch(pid_watch_handle_t &handle) noexcept { std::lock_guard guard(Base::lock); - unreserveChildWatch_nolock(handle); + unreserve_child_watch_nolock(handle); } - void unreserveChildWatch_nolock(pid_watch_handle_t &handle) noexcept + void unreserve_child_watch_nolock(pid_watch_handle_t &handle) noexcept { child_waiters.unreserve(handle); } - void addChildWatch(pid_watch_handle_t &handle, pid_t child, void *val) + void add_child_watch_nolock(pid_watch_handle_t &handle, pid_t child, void *val) { - std::lock_guard guard(Base::lock); child_waiters.add(handle, child, val); } - void addReservedChildWatch(pid_watch_handle_t &handle, pid_t child, void *val) noexcept + void add_reserved_child_watch(pid_watch_handle_t &handle, pid_t child, void *val) noexcept { std::lock_guard guard(Base::lock); child_waiters.add_from_reserve(handle, child, val); } - void addReservedChildWatch_nolock(pid_watch_handle_t &handle, pid_t child, void *val) noexcept + void add_reserved_child_watch_nolock(pid_watch_handle_t &handle, pid_t child, void *val) noexcept { child_waiters.add_from_reserve(handle, child, val); } @@ -147,26 +161,41 @@ template class ChildProcEvents : public Base child_waiters.remove(handle); } - void removeChildWatch(pid_watch_handle_t &handle) noexcept + void remove_child_watch(pid_watch_handle_t &handle) noexcept { std::lock_guard guard(Base::lock); - removeChildWatch_nolock(handle); + remove_child_watch_nolock(handle); } - void removeChildWatch_nolock(pid_watch_handle_t &handle) noexcept + void remove_child_watch_nolock(pid_watch_handle_t &handle) noexcept { child_waiters.remove(handle); child_waiters.unreserve(handle); } + // Get the reaper lock, which can be used to ensure that a process is not reaped while attempting to + // signal it. + reaper_mutex_t &get_reaper_lock() noexcept + { + return reaper_lock; + } + template void init(T *loop_mech) { + // Mask SIGCHLD: + sigset_t sigmask; + this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask); + sigaddset(&sigmask, SIGCHLD); + this->sigmaskf(SIG_SETMASK, &sigmask, nullptr); + + // On some systems a SIGCHLD handler must be established, or SIGCHLD will not be + // generated: struct sigaction chld_action; - chld_action.sa_handler = sigchld_handler; + chld_action.sa_handler = dprivate::sigchld_handler; sigemptyset(&chld_action.sa_mask); chld_action.sa_flags = 0; sigaction(SIGCHLD, &chld_action, nullptr); - loop_mech->addSignalWatch(SIGCHLD, nullptr); + loop_mech->add_signal_watch(SIGCHLD, nullptr); Base::init(loop_mech); } }; diff --git a/src/dasynq/dasynq-config.h b/src/dasynq/dasynq-config.h index 6c1cbca..cea00c3 100644 --- a/src/dasynq/dasynq-config.h +++ b/src/dasynq/dasynq-config.h @@ -1,18 +1,62 @@ #ifndef DASYNQ_CONFIG_H_INCLUDED #define DASYNQ_CONFIG_H_INCLUDED +// You can customise Dasynq's build options in this file. Typically, you won't need to do anything; the +// defaults are sensible for a range of operating systems, though for some BSD family OSes you may need +// to explicitly define DASYNQ_HAVE_KQUEUE to 1. Either kqueue or epoll must be available (i.e. either +// DASYNQ_HAVE_KQUEUE or DASYNQ_HAVE_EPOLL need to be defined to 1). There are two parts to the file: the +// first is the custom configuration section, where you may specify custom settings, and the second +// section contains automatic configuration to fill in remaining settings based on known features in +// certain operating systems and compilers. + +// --------------------------------------------------------------------------------------------------------- +// Part 1: Custom configuration, please edit to suit your system / requirements. +// --------------------------------------------------------------------------------------------------------- + +// Specify custom configuration here. You can define the following to 1 if the +// corresponding feature is available and should be used, or to 0 otherwise. +// +// If the kqueue/kevent system calls are available: +// #define DASYNQ_HAVE_KQUEUE 1 +// +// If the epoll family of system calls are available: +// #define DASYNQ_HAVE_KQUEUE 1 +// +// If the pipe2 system call is available: +// #define HAVE_PIPE2 1 +// +// A tag to include at the end of a class body for a class which is allowed to have zero size. +// Normally, C++ mandates that all objects (except empty base subobjects) have non-zero size, but on some +// compilers (at least GCC and LLVM-Clang) there are tricks to get around this awkward limitation. Note that +// using this theoretically creates an ABI issue if two different compilers are used to compile different +// parts of the same program which both use Dasynq, if one supports zero-sized classes and the other does +// not; consider defining this empty here if that concerns you: +// #define DASYNQ_EMPTY_BODY /* compiler specific! */ +// +// A statement to tell the compiler that the current line of code is unreachable, that is, it will never be +// the case that program execution flow reaches this statement: +// #define DASYNQ_UNREACHABLE /* compiler specific! */ + +// --------------------------------------------------------------------------------------------------------- +// Part 2: Automatic configuration begins here; you should not need to edit beyond this point. +// --------------------------------------------------------------------------------------------------------- + +#if ! defined(DASYNQ_HAVE_KQUEUE) #if defined(__OpenBSD__) || defined(__APPLE__) || defined(__FreeBSD__) #define DASYNQ_HAVE_KQUEUE 1 #endif +#endif #if defined(__linux__) +#if ! defined(DASYNQ_HAVE_EPOLL) #define DASYNQ_HAVE_EPOLL 1 #endif +#endif // General feature availability -#if defined(__OpenBSD__) || defined(__linux__) -#define HAVE_PIPE2 1 +#if (defined(__OpenBSD__) || defined(__linux__)) && ! defined(HAVE_PIPE2) +#define DASYNQ_HAVE_PIPE2 1 #endif @@ -26,7 +70,10 @@ #else #define DASYNQ_EMPTY_BODY char empty[0] __attribute__((unused)); // Make class instances take up no space (clang) #endif + +#if ! defined(DASYNQ_UNREACHABLE) #define DASYNQ_UNREACHABLE __builtin_unreachable() #endif +#endif /* __GNUC__ */ #endif diff --git a/src/dasynq/dasynq-epoll.h b/src/dasynq/dasynq-epoll.h index cdc7046..0cc5466 100644 --- a/src/dasynq/dasynq-epoll.h +++ b/src/dasynq/dasynq-epoll.h @@ -14,36 +14,56 @@ namespace dasynq { -template class EpollLoop; +template class epoll_loop; -class EpollTraits +class epoll_traits { - template friend class EpollLoop; + template friend class epoll_loop; public: - class SigInfo + class sigdata_t { - template friend class EpollLoop; + template friend class epoll_loop; struct signalfd_siginfo info; public: + // mandatory: int get_signo() { return info.ssi_signo; } int get_sicode() { return info.ssi_code; } - int get_siint() { return info.ssi_int; } - int get_ssiptr() { return info.ssi_ptr; } - int get_ssiaddr() { return info.ssi_addr; } + pid_t get_sipid() { return info.ssi_pid; } + uid_t get_siuid() { return info.ssi_uid; } + void * get_siaddr() { return reinterpret_cast(info.ssi_addr); } + int get_sistatus() { return info.ssi_status; } + int get_sival_int() { return info.ssi_int; } + void * get_sival_ptr() { return reinterpret_cast(info.ssi_ptr); } + + // XSI + int get_sierrno() { return info.ssi_errno; } + + // XSR (streams) OB (obselete) + int get_siband() { return info.ssi_band; } + + // Linux: + int32_t get_sifd() { return info.ssi_fd; } + uint32_t get_sittimerid() { return info.ssi_tid; } + uint32_t get_sioverrun() { return info.ssi_overrun; } + uint32_t get_sitrapno() { return info.ssi_trapno; } + uint32_t get_siutime() { return info.ssi_utime; } + uint32_t get_sistime() { return info.ssi_stime; } + // Field exposed by Linux kernel but not Glibc: + // uint16_t get_siaddr_lsb() { return info.ssi_addr_lsb; } void set_signo(int signo) { info.ssi_signo = signo; } }; - class FD_r; + class fd_r; // File descriptor optional storage. If the mechanism can return the file descriptor, this // class will be empty, otherwise it can hold a file descriptor. - class FD_s { - friend class FD_r; + class fd_s { + friend class fd_r; // Epoll doesn't return the file descriptor (it can, but it can't return both file // descriptor and user data). @@ -52,10 +72,10 @@ class EpollTraits // File descriptor reference (passed to event callback). If the mechanism can return the // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor - // must be stored in an FD_s instance. - class FD_r { + // must be stored in an fd_s instance. + class fd_r { public: - int getFd(FD_s ss) + int getFd(fd_s ss) { return ss.fd; } @@ -63,11 +83,10 @@ class EpollTraits const static bool has_bidi_fd_watch = true; const static bool has_separate_rw_fd_watches = false; - const static bool supports_childwatch_reservation = true; }; -template class EpollLoop : public Base +template class epoll_loop : public Base { int epfd; // epoll fd int sigfd; // signalfd fd; -1 if not initialised @@ -78,13 +97,13 @@ template class EpollLoop : public Base // Base contains: // lock - a lock that can be used to protect internal structure. // receive*() methods will be called with lock held. - // receive_signal(SigInfo &, user *) noexcept - // receiveFdEvent(FD_r, user *, int flags) noexcept + // receive_signal(sigdata_t &, user *) noexcept + // receive_fd_event(fd_r, user *, int flags) noexcept - using SigInfo = EpollTraits::SigInfo; - using FD_r = typename EpollTraits::FD_r; + using sigdata_t = epoll_traits::sigdata_t; + using fd_r = typename epoll_traits::fd_r; - void processEvents(epoll_event *events, int r) + void process_events(epoll_event *events, int r) { std::lock_guard guard(Base::lock); @@ -93,7 +112,7 @@ template class EpollLoop : public Base if (ptr == &sigfd) { // Signal - SigInfo siginfo; + sigdata_t siginfo; while (true) { int r = read(sigfd, &siginfo.info, sizeof(siginfo.info)); if (r == -1) break; @@ -113,7 +132,7 @@ template class EpollLoop : public Base (events[i].events & EPOLLHUP) && (flags |= IN_EVENTS); (events[i].events & EPOLLOUT) && (flags |= OUT_EVENTS); (events[i].events & EPOLLERR) && (flags |= IN_EVENTS | OUT_EVENTS | ERR_EVENTS); - Base::receiveFdEvent(*this, FD_r(), ptr, flags); + Base::receive_fd_event(*this, fd_r(), ptr, flags); } } } @@ -121,11 +140,11 @@ template class EpollLoop : public Base public: /** - * EpollLoop constructor. + * epoll_loop constructor. * * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. */ - EpollLoop() : sigfd(-1) + epoll_loop() : sigfd(-1) { epfd = epoll_create1(EPOLL_CLOEXEC); if (epfd == -1) { @@ -135,7 +154,7 @@ template class EpollLoop : public Base Base::init(this); } - ~EpollLoop() + ~epoll_loop() { close(epfd); if (sigfd != -1) { @@ -150,7 +169,7 @@ template class EpollLoop : public Base // of throwing an exception // returns: true on success; false if file descriptor type isn't supported and soft_fail == true // throws: std::system_error or std::bad_alloc on failure - bool addFdWatch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false) + bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false) { struct epoll_event epevent; // epevent.data.fd = fd; @@ -176,7 +195,7 @@ template class EpollLoop : public Base return true; } - bool addBidiFdWatch(int fd, void *userdata, int flags, bool emulate) + bool add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate) { // No implementation. throw std::system_error(std::make_error_code(std::errc::not_supported)); @@ -184,25 +203,25 @@ template class EpollLoop : public Base // flags specifies which watch to remove; ignored if the loop doesn't support // separate read/write watches. - void removeFdWatch(int fd, int flags) noexcept + void remove_fd_watch(int fd, int flags) noexcept { epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr); } - void removeFdWatch_nolock(int fd, int flags) noexcept + void remove_fd_watch_nolock(int fd, int flags) noexcept { - removeFdWatch(fd, flags); + remove_fd_watch(fd, flags); } - void removeBidiFdWatch(int fd) noexcept + void remove_bidi_fd_watch(int fd) noexcept { // Shouldn't be called for epoll. - removeFdWatch(fd, IN_EVENTS | OUT_EVENTS); + remove_fd_watch(fd, IN_EVENTS | OUT_EVENTS); } // Note this will *replace* the old flags with the new, that is, // it can enable *or disable* read/write events. - void enableFdWatch(int fd, void *userdata, int flags) noexcept + void enable_fd_watch(int fd, void *userdata, int flags) noexcept { struct epoll_event epevent; // epevent.data.fd = fd; @@ -225,12 +244,12 @@ template class EpollLoop : public Base } } - void enableFdWatch_nolock(int fd, void *userdata, int flags) + void enable_fd_watch_nolock(int fd, void *userdata, int flags) { - enableFdWatch(fd, userdata, flags); + enable_fd_watch(fd, userdata, flags); } - void disableFdWatch(int fd, int flags) noexcept + void disable_fd_watch(int fd, int flags) noexcept { struct epoll_event epevent; // epevent.data.fd = fd; @@ -246,16 +265,21 @@ template class EpollLoop : public Base } } - void disableFdWatch_nolock(int fd, int flags) noexcept + void disable_fd_watch_nolock(int fd, int flags) noexcept { - disableFdWatch(fd, flags); + disable_fd_watch(fd, flags); } - + // Note signal should be masked before call. - void addSignalWatch(int signo, void *userdata) + void add_signal_watch(int signo, void *userdata) { std::lock_guard guard(Base::lock); + add_signal_watch_nolock(signo, userdata); + } + // Note signal should be masked before call. + void add_signal_watch_nolock(int signo, void *userdata) + { sigdataMap[signo] = userdata; // Modify the signal fd to watch the new signal @@ -282,22 +306,22 @@ template class EpollLoop : public Base } // Note, called with lock held: - void rearmSignalWatch_nolock(int signo) noexcept + void rearm_signal_watch_nolock(int signo, void *userdata) noexcept { sigaddset(&sigmask, signo); signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); } - void removeSignalWatch_nolock(int signo) noexcept + void remove_signal_watch_nolock(int signo) noexcept { sigdelset(&sigmask, signo); signalfd(sigfd, &sigmask, 0); } - void removeSignalWatch(int signo) noexcept + void remove_signal_watch(int signo) noexcept { std::lock_guard guard(Base::lock); - removeSignalWatch_nolock(signo); + remove_signal_watch_nolock(signo); } // If events are pending, process an unspecified number of them. @@ -306,11 +330,11 @@ template class EpollLoop : public Base // simultaneously). // If processing an event removes a watch, there is a possibility // that the watched event will still be reported (if it has - // occurred) before pullEvents() returns. + // occurred) before pull_events() returns. // // do_wait - if false, returns immediately if no events are // pending. - void pullEvents(bool do_wait) + void pull_events(bool do_wait) { epoll_event events[16]; int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0); @@ -320,7 +344,7 @@ template class EpollLoop : public Base } do { - processEvents(events, r); + process_events(events, r); r = epoll_wait(epfd, events, 16, 0); } while (r > 0); } diff --git a/src/dasynq/dasynq-interrupt.h b/src/dasynq/dasynq-interrupt.h index 5864220..07ee12d 100644 --- a/src/dasynq/dasynq-interrupt.h +++ b/src/dasynq/dasynq-interrupt.h @@ -6,6 +6,7 @@ #include "dasynq-config.h" #include "dasynq-mutex.h" +#include "dasynq-util.h" /* * Mechanism for interrupting an event loop wait. @@ -27,24 +28,10 @@ template class interrupt_channel : public Base template class interrupt_channel : public Base { -#ifdef HAVE_PIPE2 - int create_pipe(int filedes[2]) + static inline int create_pipe(int filedes[2]) { return pipe2(filedes, O_CLOEXEC | O_NONBLOCK); } -#else - int create_pipe(int filedes[2]) - { - int r = pipe(filedes); - if (r != -1) { - fcntl(filedes[0], F_SETFD, O_CLOEXEC); - fcntl(filedes[1], F_SETFD, O_CLOEXEC); - fcntl(filedes[0], F_SETFL, O_NONBLOCK); - fcntl(filedes[1], F_SETFL, O_NONBLOCK); - } - return r; - } -#endif int pipe_r_fd; int pipe_w_fd; @@ -62,7 +49,7 @@ template class interrupt_channel : public Base pipe_w_fd = pipedes[1]; try { - loop_mech->addFdWatch(pipe_r_fd, &pipe_r_fd, IN_EVENTS); + loop_mech->add_fd_watch(pipe_r_fd, &pipe_r_fd, IN_EVENTS); } catch (...) { close (pipe_r_fd); @@ -74,7 +61,7 @@ template class interrupt_channel : public Base } template - void receiveFdEvent(T &loop_mech, typename Base::FD_r fd_r, void * userdata, int flags) + void receive_fd_event(T &loop_mech, typename Base::traits_t::fd_r fd_r_a, void * userdata, int flags) { if (userdata == &pipe_r_fd) { // try to clear the pipe @@ -82,7 +69,7 @@ template class interrupt_channel : public Base read(pipe_r_fd, buf, 64); } else { - Base::receiveFdEvent(loop_mech, fd_r, userdata, flags); + Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags); } } diff --git a/src/dasynq/dasynq-itimer.h b/src/dasynq/dasynq-itimer.h index 99f360f..c4d42a1 100644 --- a/src/dasynq/dasynq-itimer.h +++ b/src/dasynq/dasynq-itimer.h @@ -10,58 +10,69 @@ namespace dasynq { // Timer implementation based on the (basically obsolete) POSIX itimer interface. -template class ITimerEvents : public timer_base +// With this timer implementation, we only use one clock, and allow no distinction between the +// monotonic and system time. + +template class itimer_events : public timer_base { private: - timer_queue_t timer_queue; - -#if defined(CLOCK_MONOTONIC) - static inline void get_curtime(struct timespec &curtime) - { - clock_gettime(CLOCK_MONOTONIC, &curtime); - } -#else - static inline void get_curtime(struct timespec &curtime) - { - struct timeval curtime_tv; - gettimeofday(&curtime_tv, nullptr); - curtime.tv_sec = curtime_tv.tv_sec; - curtime.tv_nsec = curtime_tv.tv_usec * 1000; - } -#endif - // Set the timerfd timeout to match the first timer in the queue (disable the timerfd - // if there are no active timers). + // Set the alarm timeout to match the first timer in the queue (disable the alarm if there are no + // active timers). void set_timer_from_queue() { time_val newtime; struct itimerval newalarm; - if (timer_queue.empty()) { - newalarm.it_value = {0, 0}; - newalarm.it_interval = {0, 0}; - setitimer(ITIMER_REAL, &newalarm, nullptr); - return; + + bool interval_set = false; + time_val interval_tv = {0, 0}; + + auto &timer_queue = this->queue_for_clock(clock_type::SYSTEM); + if (! timer_queue.empty()) { + newtime = timer_queue.get_root_priority(); + + time_val curtimev; + timer_base::get_time(curtimev, clock_type::SYSTEM, true); + + // interval before next timeout: + if (curtimev < newtime) { + interval_tv = newtime - curtimev; + } + + interval_set = true; } - newtime = timer_queue.get_root_priority(); - - struct timespec curtime; - get_curtime(curtime); - time_val curtimev = curtime; - - newalarm.it_interval = {0, 0}; - if (curtimev < newtime) { - newalarm.it_value.tv_sec = newtime.seconds() - curtime.tv_sec; - if (curtimev.nseconds() > newtime.nseconds()) { - newalarm.it_value.tv_usec = (1000000000 - curtimev.nseconds() - + newtime.nseconds()) / 1000; - newalarm.it_value.tv_sec--; +#ifdef CLOCK_MONOTONIC + auto &mono_timer_queue = this->queue_for_clock(clock_type::MONOTONIC); + + if (! mono_timer_queue.empty()) { + + // If we have a separate monotonic clock, we get the interval for the expiry of the next monotonic + // timer and use the lesser of the system interval and monotonic interval: + time_val mono_newtime = mono_timer_queue.get_root_priority(); + + time_val curtimev_mono; + timer_base::get_time(curtimev_mono, clock_type::MONOTONIC, true); + + time_val interval_mono = {0, 0}; + if (curtimev_mono < mono_newtime) { + interval_mono = mono_newtime - curtimev_mono; } - else { - newalarm.it_value.tv_usec = (newtime.nseconds() - curtime.tv_nsec) / 1000; + + if (! interval_set || interval_mono < interval_tv) { + interval_tv = interval_mono; } + + interval_set = true; } - else { +#endif + + newalarm.it_value.tv_sec = interval_tv.seconds(); + newalarm.it_value.tv_usec = interval_tv.nseconds() / 1000; + newalarm.it_interval.tv_sec = 0; + newalarm.it_interval.tv_usec = 0; + + if (interval_set && newalarm.it_value.tv_sec == 0 && newalarm.it_value.tv_usec == 0) { // We passed the timeout: set alarm to expire immediately (we must use {0,1} as // {0,0} disables the timer). // TODO: it would be better if we just processed the appropriate timers here, @@ -76,18 +87,29 @@ template class ITimerEvents : public timer_base protected: - using SigInfo = typename Base::SigInfo; + using sigdata_t = typename Base::sigdata_t; template - bool receive_signal(T & loop_mech, SigInfo &siginfo, void *userdata) + bool receive_signal(T & loop_mech, sigdata_t &siginfo, void *userdata) { if (siginfo.get_signo() == SIGALRM) { - struct timespec curtime; - get_curtime(curtime); + auto &timer_queue = this->queue_for_clock(clock_type::SYSTEM); + if (! timer_queue.empty()) { + struct timespec curtime; + timer_base::get_time(curtime, clock_type::SYSTEM, true); + timer_base::process_timer_queue(timer_queue, curtime); + } - timer_base::process_timer_queue(timer_queue, curtime); +#ifdef CLOCK_MONOTONIC + auto &mono_timer_queue = this->queue_for_clock(clock_type::MONOTONIC); + if (! mono_timer_queue.empty()) { + struct timespec curtime_mono; + timer_base::get_time(curtime_mono, clock_type::MONOTONIC, true); + timer_base::process_timer_queue(mono_timer_queue, curtime_mono); + } +#endif - // arm timerfd with timeout from head of queue + // arm alarm with timeout from head of queue set_timer_from_queue(); return false; // don't disable signal watch } @@ -98,41 +120,27 @@ template class ITimerEvents : public timer_base public: + class traits_t : public Base::traits_t + { + constexpr static bool full_timer_support = false; + }; + template void init(T *loop_mech) { sigset_t sigmask; - sigprocmask(SIG_UNBLOCK, nullptr, &sigmask); + this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask); sigaddset(&sigmask, SIGALRM); - sigprocmask(SIG_SETMASK, &sigmask, nullptr); - loop_mech->addSignalWatch(SIGALRM, nullptr); + this->sigmaskf(SIG_SETMASK, &sigmask, nullptr); + loop_mech->add_signal_watch(SIGALRM, nullptr); Base::init(loop_mech); } - - void addTimer(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC) - { - std::lock_guard guard(Base::lock); - timer_queue.allocate(h, userdata); - } - - void removeTimer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept - { - std::lock_guard guard(Base::lock); - removeTimer_nolock(timer_id, clock); - } - - void removeTimer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept - { - if (timer_queue.is_queued(timer_id)) { - timer_queue.remove(timer_id); - } - timer_queue.deallocate(timer_id); - } // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0. // enable: specifies whether to enable reporting of timeouts/intervals - void setTimer(timer_handle_t &timer_id, const time_val &timeouttv, const time_val &intervaltv, + void set_timer(timer_handle_t &timer_id, const time_val &timeouttv, const time_val &intervaltv, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept { + auto &timer_queue = this->queue_for_clock(clock); timespec timeout = timeouttv; timespec interval = intervaltv; @@ -157,44 +165,23 @@ template class ITimerEvents : public timer_base } // Set timer relative to current time: - void setTimerRel(timer_handle_t &timer_id, const time_val &timeouttv, const time_val &intervaltv, + void set_timer_rel(timer_handle_t &timer_id, const time_val &timeouttv, const time_val &intervaltv, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept { timespec timeout = timeouttv; timespec interval = intervaltv; - // TODO consider caching current time somehow; need to decide then when to update cached value. struct timespec curtime; - get_curtime(curtime); + timer_base::get_time(curtime, clock, false); curtime.tv_sec += timeout.tv_sec; curtime.tv_nsec += timeout.tv_nsec; if (curtime.tv_nsec > 1000000000) { curtime.tv_nsec -= 1000000000; curtime.tv_sec++; } - setTimer(timer_id, curtime, interval, enable, clock); - } - - // Enables or disabling report of timeouts (does not stop timer) - void enableTimer(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept - { - std::lock_guard guard(Base::lock); - enableTimer_nolock(timer_id, enable, clock); + set_timer(timer_id, curtime, interval, enable, clock); } - void enableTimer_nolock(timer_handle_t &timer_id, bool enable, clock_type = clock_type::MONOTONIC) noexcept - { - auto &node_data = timer_queue.node_data(timer_id); - auto expiry_count = node_data.expiry_count; - if (expiry_count != 0) { - node_data.expiry_count = 0; - Base::receiveTimerExpiry(timer_id, node_data.userdata, expiry_count); - } - else { - timer_queue.node_data(timer_id).enabled = enable; - } - } - void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept { std::lock_guard guard(Base::lock); @@ -203,6 +190,7 @@ template class ITimerEvents : public timer_base void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept { + auto &timer_queue = this->queue_for_clock(clock); if (timer_queue.is_queued(timer_id)) { bool was_first = (&timer_queue.get_root()) == &timer_id; timer_queue.remove(timer_id); @@ -211,18 +199,6 @@ template class ITimerEvents : public timer_base } } } - - void get_time(time_val &tv, clock_type clock, bool force_update) noexcept - { - timespec ts; - get_time(ts, clock, force_update); - tv = ts; - } - - void get_time(timespec &ts, clock_type clock, bool force_update) noexcept - { - get_curtime(ts); - } }; } diff --git a/src/dasynq/dasynq-kqueue.h b/src/dasynq/dasynq-kqueue.h index 4f586b4..b475f00 100644 --- a/src/dasynq/dasynq-kqueue.h +++ b/src/dasynq/dasynq-kqueue.h @@ -4,18 +4,11 @@ #include #include -#ifdef __OpenBSD__ -#include // for __thrsigdivert aka sigtimedwait -#include -extern "C" { - int __thrsigdivert(sigset_t set, siginfo_t *info, const struct timespec * timeout); -} -#endif - #include #include #include #include +#include #include #include @@ -36,74 +29,72 @@ extern "C" { namespace dasynq { -template class KqueueLoop; +template class kqueue_loop; -class KqueueTraits +class kqueue_traits { - template friend class KqueueLoop; + template friend class kqueue_loop; public: - class SigInfo + class sigdata_t { - template friend class KqueueLoop; + template friend class kqueue_loop; siginfo_t info; public: + // mandatory: int get_signo() { return info.si_signo; } int get_sicode() { return info.si_code; } - void * get_ssiaddr() { return info.si_addr; } + pid_t get_sipid() { return info.si_pid; } + uid_t get_siuid() { return info.si_uid; } + void * get_siaddr() { return info.si_addr; } + int get_sistatus() { return info.si_status; } + int get_sival_int() { return info.si_value.sival_int; } + void * get_sival_ptr() { return info.si_value.sival_ptr; } + // XSI + int get_sierrno() { return info.si_errno; } + + // XSR (streams) OB (obselete) +#if !defined(__OpenBSD__) + // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS + // interface. + int get_siband() { return info.si_band; } +#endif + void set_signo(int signo) { info.si_signo = signo; } }; - class FD_r; + class fd_r; // File descriptor optional storage. If the mechanism can return the file descriptor, this // class will be empty, otherwise it can hold a file descriptor. - class FD_s { + class fd_s { DASYNQ_EMPTY_BODY }; // File descriptor reference (passed to event callback). If the mechanism can return the // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor - // must be stored in an FD_s instance. - class FD_r { + // must be stored in an fd_s instance. + class fd_r { int fd; public: - int getFd(FD_s ss) + int getFd(fd_s ss) { return fd; } - FD_r(int nfd) : fd(nfd) + fd_r(int nfd) : fd(nfd) { } }; const static bool has_bidi_fd_watch = false; const static bool has_separate_rw_fd_watches = true; - const static bool supports_childwatch_reservation = true; }; -#if defined(__OpenBSD__) && _POSIX_REALTIME_SIGNALS <= 0 -// OpenBSD has no sigtimedwait (or sigwaitinfo) but does have "__thrsigdivert", which is -// essentially an incomplete version of the same thing. Discussion with OpenBSD developer -// Ted Unangst suggested that the siginfo_t structure returned might not always have all fields -// set correctly. Furthermore there is a bug (at least in 5.9) such that specifying a zero -// timeout (or indeed any timeout less than a tick) results in NO timeout. We get around this by -// instead specifying an *invalid* timeout, which won't error out if a signal is pending. -static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct timespec *timeout) -{ - // We know that we're only called with a timeout of 0 (which doesn't work properly) and - // that we safely overwrite the timeout. So, we set tv_nsec to an invalid value, which - // will cause EINVAL to be returned, but will still pick up any pending signals *first*. - timeout->tv_nsec = 1000000001; - return __thrsigdivert(*ssp, info, timeout); -} -#endif - -#if defined(__OpenBSD__) || _POSIX_REALTIME_SIGNALS > 0 +#if _POSIX_REALTIME_SIGNALS > 0 static inline void prepare_signal(int signo) { } static inline void unprep_signal(int signo) { } @@ -168,53 +159,61 @@ inline bool get_siginfo(int signo, siginfo_t *siginfo) #endif -template class KqueueLoop : public Base +template class kqueue_loop : public Base { int kqfd; // kqueue fd - sigset_t sigmask; // enabled signal watch mask - // Map of signal number to user data pointer. If kqueue had been better thought-through, - // we shouldn't need this. Although we can associate user data with an EVFILT_SIGNAL kqueue - // filter, the problem is that the kqueue signal report *coexists* with the regular signal + // The kqueue signal reporting mechanism *coexists* with the regular signal // delivery mechanism without having any connection to it. Whereas regular signals can be // queued (especially "realtime" signals, via sigqueue()), kqueue just maintains a counter // of delivery attempts and clears this when we read the event. What this means is that // kqueue won't necessarily tell us if signals are pending, in the case that: // 1) it already reported the attempted signal delivery and - // 2) more than one of the same signal was pending at that time and + // 2) more than instance one of the same signal was pending at that time and // 3) no more deliveries of the same signal have been attempted in the meantime. - // Of course, if kqueue doesn't report the signal, then it doesn't give us the data associated - // with the event, so we need to maintain that separately too: - std::unordered_map sigdataMap; - + // Also, kqueue won't tell us about signals that were pending at the time the signal filter + // was added. Finally, because pending signals can be merged, the count of delivery attempts + // provided by kqueue does not necessarily match the number of signals actually pending. + // + // Note that POSIX allows for multiple instances of a signal to be pending even on systems + // that don't support queueing of signals. + // + // Ultimately, this means we need to check for pending signals independently of what kqueue + // tells us. + // Base contains: // lock - a lock that can be used to protect internal structure. // receive*() methods will be called with lock held. - // receiveSignal(SigInfo &, user *) noexcept - // receiveFdEvent(FD_r, user *, int flags) noexcept + // receive_signal(sigdata_t &, user *) noexcept + // receive_fd_event(fd_r, user *, int flags) noexcept - using SigInfo = KqueueTraits::SigInfo; - using FD_r = typename KqueueTraits::FD_r; + using sigdata_t = kqueue_traits::sigdata_t; + using fd_r = typename kqueue_traits::fd_r; - void processEvents(struct kevent *events, int r) + // The flag to specify poll() semantics for regular file readiness: that is, we want + // ready-for-read to be returned even at end of file: +#if defined(NOTE_FILE_POLL) + // FreeBSD: + constexpr static int POLL_SEMANTICS = NOTE_FILE_POLL; +#else + // Note that macOS has an "EV_POLL" defined that looks like it should give poll semantics + // when passed as a flag. However, it is filtered at the syscall entry so we cannot use it in + // kqueue. (The kernel uses it internally to implement poll()). + constexpr static int POLL_SEMANTICS = 0; +#endif + + void process_events(struct kevent *events, int r) { std::lock_guard guard(Base::lock); for (int i = 0; i < r; i++) { if (events[i].filter == EVFILT_SIGNAL) { - SigInfo siginfo; - if (get_siginfo(events[i].ident, &siginfo.info) - && Base::receive_signal(*this, siginfo, (void *)events[i].udata)) { - sigdelset(&sigmask, events[i].ident); - events[i].flags = EV_DISABLE; - } - else { - events[i].flags = EV_ENABLE; - } + bool reenable = pull_signal(events[i].ident, events[i].udata); + events[i].flags = reenable ? EV_ENABLE : EV_DISABLE; } else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) { int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS; - Base::receiveFdEvent(*this, FD_r(events[i].ident), events[i].udata, flags); + Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags); events[i].flags = EV_DISABLE | EV_CLEAR; // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for // another connection). @@ -228,39 +227,76 @@ template class KqueueLoop : public Base kevent(kqfd, events, r, nullptr, 0, nullptr); } + // Pull a signal from pending, and report it, until it is no longer pending or the watch + // should be disabled. Call with lock held. + // Returns: true if watcher should be enabled, false if disabled. + bool pull_signal(int signo, void *userdata) + { + bool enable_filt = true; + sigdata_t siginfo; + +#if _POSIX_REALTIME_SIGNALS > 0 + struct timespec timeout = {0, 0}; + sigset_t sigw_mask; + sigemptyset(&sigw_mask); + sigaddset(&sigw_mask, signo); + int rsigno = sigtimedwait(&sigw_mask, &siginfo.info, &timeout); + while (rsigno > 0) { + if (Base::receive_signal(*this, siginfo, userdata)) { + enable_filt = false; + break; + } + rsigno = sigtimedwait(&sigw_mask, &siginfo.info, &timeout); + } +#else + // we have no sigtimedwait. + sigset_t pending_sigs; + sigpending(&pending_sigs); + while (sigismember(&pending_sigs, signo)) { + get_siginfo(signo, &siginfo.info); + if (Base::receive_signal(*this, siginfo, userdata)) { + enable_filt = false; + break; + } + sigpending(&pending_sigs); + } +#endif + return enable_filt; + } + public: /** - * KqueueLoop constructor. + * kqueue_loop constructor. * * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. */ - KqueueLoop() + kqueue_loop() { kqfd = kqueue(); if (kqfd == -1) { throw std::system_error(errno, std::system_category()); } - sigemptyset(&sigmask); Base::init(this); } - ~KqueueLoop() + ~kqueue_loop() { close(kqfd); } - void setFilterEnabled(short filterType, uintptr_t ident, void *udata, bool enable) + void set_filter_enabled(short filterType, uintptr_t ident, void *udata, bool enable) { - // Note, on OpenBSD enabling or disabling filter will not alter the filter parameters (udata etc); + // Note, on OpenBSD enabling or disabling filter will not alter the filter parameters (udata etc); // on OS X however, it will. Therefore we set udata here (to the same value as it was originally // set) in order to work correctly on both kernels. struct kevent kev; - EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, 0, 0, udata); + int fflags = (filterType == EVFILT_READ) ? POLL_SEMANTICS : 0; + EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, fflags, 0, udata); kevent(kqfd, &kev, 1, nullptr, 0, nullptr); } - void removeFilter(short filterType, uintptr_t ident) + void remove_filter(short filterType, uintptr_t ident) { struct kevent kev; EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0); @@ -273,17 +309,31 @@ template class KqueueLoop : public Base // (only one of IN_EVENTS/OUT_EVENTS can be specified) // soft_fail: true if unsupported file descriptors should fail by returning false instead // of throwing an exception - // returns: true on success; false if file descriptor type isn't supported and soft_fail == true + // returns: true on success; false if file descriptor type isn't supported and emulate == true // throws: std::system_error or std::bad_alloc on failure - bool addFdWatch(int fd, void *userdata, int flags, bool enabled = true, bool emulate = false) + bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool emulate = false) { short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE; + if (filter == EVFILT_READ && POLL_SEMANTICS == 0 && emulate) { + // We can't request poll semantics, so check for regular file: + struct stat statbuf; + if (fstat(fd, &statbuf) == -1) { + throw new std::system_error(errno, std::system_category()); + } + if ((statbuf.st_mode & S_IFMT) == S_IFREG) { + // Regular file: emulation required + return false; + } + } + + int fflags = (filter == EVFILT_READ) ? POLL_SEMANTICS : 0; + struct kevent kev; - EV_SET(&kev, fd, filter, EV_ADD | (enabled ? 0 : EV_DISABLE), 0, 0, userdata); + EV_SET(&kev, fd, filter, EV_ADD | (enabled ? 0 : EV_DISABLE), fflags, 0, userdata); if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) { - // Note that kqueue supports EVFILT_READ on regular file fd's, but not EVFIL_WRITE. - if (filter == EVFILT_WRITE && errno == EINVAL) { + // Note that kqueue supports EVFILT_READ on regular file fd's, but not EVFILT_WRITE. + if (filter == EVFILT_WRITE && errno == EINVAL && emulate) { return false; // emulate } throw new std::system_error(errno, std::system_category()); @@ -294,14 +344,14 @@ template class KqueueLoop : public Base // returns: 0 on success // IN_EVENTS if in watch requires emulation // OUT_EVENTS if out watch requires emulation - int addBidiFdWatch(int fd, void *userdata, int flags, bool emulate = false) + int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false) { #ifdef EV_RECEIPT struct kevent kev[2]; struct kevent kev_r[2]; short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT; short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT; - EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata); + EV_SET(&kev[0], fd, EVFILT_READ, rflags, POLL_SEMANTICS, 0, userdata); EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata); int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr); @@ -323,7 +373,16 @@ template class KqueueLoop : public Base if (kev_r[1].data != 0) { if (emulate) { - return OUT_EVENTS; + // We can emulate, but, do we have correct semantics? + if (POLL_SEMANTICS != 0) { + return OUT_EVENTS; + } + + // if we can't get poll semantics, emulate for read as well: + // first remove read watch: + EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata); + kevent(kqfd, kev, 1, nullptr, 0, nullptr); + return IN_EVENTS | OUT_EVENTS; } // remove read watch EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata); @@ -368,17 +427,17 @@ template class KqueueLoop : public Base // flags specifies which watch to remove; ignored if the loop doesn't support // separate read/write watches. - void removeFdWatch(int fd, int flags) + void remove_fd_watch(int fd, int flags) { - removeFilter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd); + remove_filter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd); } - void removeFdWatch_nolock(int fd, int flags) + void remove_fd_watch_nolock(int fd, int flags) { - removeFdWatch(fd, flags); + remove_fd_watch(fd, flags); } - void removeBidiFdWatch(int fd) noexcept + void remove_bidi_fd_watch(int fd) noexcept { struct kevent kev[2]; EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); @@ -387,61 +446,76 @@ template class KqueueLoop : public Base kevent(kqfd, kev, 2, nullptr, 0, nullptr); } - void enableFdWatch(int fd, void *userdata, int flags) + void enable_fd_watch(int fd, void *userdata, int flags) { - setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, userdata, true); + set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, userdata, true); } - void enableFdWatch_nolock(int fd, void *userdata, int flags) + void enable_fd_watch_nolock(int fd, void *userdata, int flags) { - enableFdWatch(fd, userdata, flags); + enable_fd_watch(fd, userdata, flags); } - void disableFdWatch(int fd, int flags) + void disable_fd_watch(int fd, int flags) { - setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, nullptr, false); + set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, nullptr, false); } - void disableFdWatch_nolock(int fd, int flags) + void disable_fd_watch_nolock(int fd, int flags) { - disableFdWatch(fd, flags); + disable_fd_watch(fd, flags); } - + // Note signal should be masked before call. - void addSignalWatch(int signo, void *userdata) + void add_signal_watch(int signo, void *userdata) { std::lock_guard guard(Base::lock); - - sigdataMap[signo] = userdata; - sigaddset(&sigmask, signo); - + add_signal_watch_nolock(signo, userdata); + } + + // Note signal should be masked before call. + void add_signal_watch_nolock(int signo, void *userdata) + { prepare_signal(signo); + // We need to register the filter with the kqueue early, to avoid a race where we miss + // signals: struct kevent evt; - EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD, 0, 0, userdata); - // TODO use EV_DISPATCH if available (not on OpenBSD/OS X) - + EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD | EV_DISABLE, 0, 0, userdata); if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) { throw new std::system_error(errno, std::system_category()); } + // TODO use EV_DISPATCH if available (not on OpenBSD/OS X) + + // The signal might be pending already but won't be reported by kqueue in that case. We can queue + // it immediately (note that it might be pending multiple times, so we need to re-check once signal + // processing finishes if it is re-armed). + + bool enable_filt = pull_signal(signo, userdata); + + if (enable_filt) { + evt.flags = EV_ENABLE; + if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } } // Note, called with lock held: - void rearmSignalWatch_nolock(int signo) noexcept + void rearm_signal_watch_nolock(int signo, void *userdata) noexcept { - sigaddset(&sigmask, signo); - - struct kevent evt; - EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ENABLE, 0, 0, 0); - // TODO use EV_DISPATCH if available (not on OpenBSD) - - kevent(kqfd, &evt, 1, nullptr, 0, nullptr); + if (pull_signal(signo, userdata)) { + struct kevent evt; + EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ENABLE, 0, 0, userdata); + // TODO use EV_DISPATCH if available (not on OpenBSD) + + kevent(kqfd, &evt, 1, nullptr, 0, nullptr); + } } - void removeSignalWatch_nolock(int signo) noexcept + void remove_signal_watch_nolock(int signo) noexcept { unprep_signal(signo); - sigdelset(&sigmask, signo); struct kevent evt; EV_SET(&evt, signo, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0); @@ -449,47 +523,10 @@ template class KqueueLoop : public Base kevent(kqfd, &evt, 1, nullptr, 0, nullptr); } - void removeSignalWatch(int signo) noexcept + void remove_signal_watch(int signo) noexcept { std::lock_guard guard(Base::lock); - removeSignalWatch_nolock(signo); - } - - private: - - // We actually need to check pending signals before polling the kqueue, since kqueue can - // count signals as they are delivered but the count is cleared when we poll the kqueue, - // meaning that signals might still be pending if they were queued multiple times at the - // last poll (since we report only one signal delivery at a time and the watch is - // automatically disabled each time). - // - // The check is not necessary on systems that don't queue signals. -void pull_signals() - { -#if _POSIX_REALTIME_SIGNALS > 0 - // TODO we should only poll for signals that *have* been reported - // as being raised more than once prior via kevent, rather than all - // signals that have been registered - in many cases that may allow - // us to skip the sigtimedwait call altogether. - { - std::lock_guard guard(Base::lock); - - struct timespec timeout; - timeout.tv_sec = 0; - timeout.tv_nsec = 0; - SigInfo siginfo; - int rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout); - while (rsigno > 0) { - if (Base::receiveSignal(*this, siginfo, sigdataMap[rsigno])) { - sigdelset(&sigmask, rsigno); - // TODO accumulate and disable multiple filters with a single kevents call - // rather than disabling each individually - setFilterEnabled(EVFILT_SIGNAL, rsigno, sigdataMap[rsigno], false); - } - rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout); - } - } -#endif + remove_signal_watch_nolock(signo); } public: @@ -500,16 +537,15 @@ void pull_signals() // simultaneously). // If processing an event removes a watch, there is a possibility // that the watched event will still be reported (if it has - // occurred) before pullEvents() returns. + // occurred) before pull_events() returns. // // do_wait - if false, returns immediately if no events are // pending. - void pullEvents(bool do_wait) + void pull_events(bool do_wait) { - pull_signals(); - struct kevent events[16]; struct timespec ts; + ts.tv_sec = 0; ts.tv_nsec = 0; int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts); @@ -519,7 +555,7 @@ void pull_signals() } do { - processEvents(events, r); + process_events(events, r); r = kevent(kqfd, nullptr, 0, events, 16, &ts); } while (r > 0); } diff --git a/src/dasynq/dasynq-mutex.h b/src/dasynq/dasynq-mutex.h index 312c22d..b4023e7 100644 --- a/src/dasynq/dasynq-mutex.h +++ b/src/dasynq/dasynq-mutex.h @@ -1,55 +1,22 @@ #ifndef DASYNQ_MUTEX_H_INCLUDED #define DASYNQ_MUTEX_H_INCLUDED -//#include #include namespace dasynq { -// Simple non-recursive mutex, with priority inheritance to avoid priority inversion. -/* -class DMutex -{ - private: - pthread_mutex_t mutex; - - public: - DMutex() - { - // Avoid priority inversion by using PTHREAD_PRIO_INHERIT - pthread_mutexattr_t attribs; - pthread_mutexattr_init(&attribs); - pthread_mutexattr_setprotocol(&attribs, PTHREAD_PRIO_INHERIT); - pthread_mutex_init(&mutex, &attribs); - } - - void lock() - { - pthread_mutex_lock(&mutex); - } - - void unlock() - { - pthread_mutex_unlock(&mutex); - } -}; -*/ - -using DMutex = std::mutex; - // A "null" mutex, for which locking / unlocking actually does nothing. class null_mutex { - DASYNQ_EMPTY_BODY - public: void lock() { } void unlock() { } void try_lock() { } -}; + private: + DASYNQ_EMPTY_BODY; +}; } // end of namespace - #endif diff --git a/src/dasynq/dasynq-naryheap.h b/src/dasynq/dasynq-naryheap.h index 4156a42..7713b81 100644 --- a/src/dasynq/dasynq-naryheap.h +++ b/src/dasynq/dasynq-naryheap.h @@ -1,7 +1,7 @@ #ifndef DASYNC_NARYHEAP_H_INCLUDED #define DASYNC_NARYHEAP_H_INCLUDED -#include "dasynq-svec.h" +#include #include #include #include @@ -51,7 +51,7 @@ class NaryHeap HeapNode() { } }; - svector hvec; + std::vector hvec; using hindex_t = typename decltype(hvec)::size_type; @@ -259,8 +259,7 @@ class NaryHeap { new (& hnd.hd) T(u...); hnd.heap_index = -1; - constexpr hindex_t max_allowed = std::numeric_limits::is_signed ? - std::numeric_limits::max() : ((hindex_t) - 2); + hindex_t max_allowed = hvec.max_size(); if (num_nodes == max_allowed) { throw std::bad_alloc(); @@ -289,10 +288,14 @@ class NaryHeap { num_nodes--; - // shrink the capacity of hvec if num_nodes is sufficiently less than - // its current capacity: + // shrink the capacity of hvec if num_nodes is sufficiently less than its current capacity. Why + // capacity/4? Because in general, capacity must be at least doubled when it is exceeded to get + // O(N) amortised insertion cost. If we shrink-to-fit every time num_nodes < capacity/2, this + // means we might potentially get pathological "bouncing" as num_nodes crosses the threshold + // repeatedly as nodes get added and removed. + if (num_nodes < hvec.capacity() / 4) { - hvec.shrink_to(num_nodes * 2); + hvec.shrink_to_fit(); } } diff --git a/src/dasynq/dasynq-posixtimer.h b/src/dasynq/dasynq-posixtimer.h index 85a0304..3473330 100644 --- a/src/dasynq/dasynq-posixtimer.h +++ b/src/dasynq/dasynq-posixtimer.h @@ -13,12 +13,9 @@ namespace dasynq { // Timer implementation based on POSIX create_timer et al. // May require linking with -lrt -template class PosixTimerEvents : public timer_base +template class posix_timer_events : public timer_base { private: - timer_queue_t real_timer_queue; - timer_queue_t mono_timer_queue; - timer_t real_timer; timer_t mono_timer; @@ -42,27 +39,29 @@ template class PosixTimerEvents : public timer_base protected: - using SigInfo = typename Base::SigInfo; + using sigdata_t = typename Base::sigdata_t; template - bool receive_signal(T & loop_mech, SigInfo &siginfo, void *userdata) + bool receive_signal(T & loop_mech, sigdata_t &siginfo, void *userdata) { + auto &real_timer_queue = this->queue_for_clock(clock_type::SYSTEM); + auto &mono_timer_queue = this->queue_for_clock(clock_type::MONOTONIC); + if (siginfo.get_signo() == SIGALRM) { - struct timespec curtime; + time_val curtime; if (! real_timer_queue.empty()) { - clock_gettime(CLOCK_REALTIME, &curtime); - timer_base::process_timer_queue(real_timer_queue, curtime); + this->get_time(curtime, clock_type::SYSTEM, true); + this->process_timer_queue(real_timer_queue, curtime.get_timespec()); set_timer_from_queue(real_timer, real_timer_queue); } if (! mono_timer_queue.empty()) { - clock_gettime(CLOCK_MONOTONIC, &curtime); - timer_base::process_timer_queue(mono_timer_queue, curtime); + this->get_time(curtime, clock_type::MONOTONIC, true); + this->process_timer_queue(mono_timer_queue, curtime); set_timer_from_queue(mono_timer, mono_timer_queue); } - // loop_mech.rearmSignalWatch_nolock(SIGALRM); return false; // don't disable signal watch } else { @@ -70,18 +69,6 @@ template class PosixTimerEvents : public timer_base } } - timer_queue_t &queue_for_clock(clock_type clock) - { - switch (clock) { - case clock_type::MONOTONIC: - return mono_timer_queue; - case clock_type::SYSTEM: - return real_timer_queue; - default: - DASYNQ_UNREACHABLE; - } - } - timer_t &timer_for_clock(clock_type clock) { switch (clock) { @@ -96,13 +83,18 @@ template class PosixTimerEvents : public timer_base public: + class traits_t : public Base::traits_t + { + constexpr static bool full_timer_support = true; + }; + template void init(T *loop_mech) { sigset_t sigmask; - sigprocmask(SIG_UNBLOCK, nullptr, &sigmask); + this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask); sigaddset(&sigmask, SIGALRM); - sigprocmask(SIG_SETMASK, &sigmask, nullptr); - loop_mech->addSignalWatch(SIGALRM, nullptr); + this->sigmaskf(SIG_SETMASK, &sigmask, nullptr); + loop_mech->add_signal_watch(SIGALRM, nullptr); struct sigevent timer_sigevent; timer_sigevent.sigev_notify = SIGEV_SIGNAL; @@ -123,38 +115,16 @@ template class PosixTimerEvents : public timer_base Base::init(loop_mech); } - void addTimer(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC) - { - std::lock_guard guard(Base::lock); - queue_for_clock(clock).allocate(h, userdata); - } - - void removeTimer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept - { - std::lock_guard guard(Base::lock); - removeTimer_nolock(timer_id, clock); - } - - void removeTimer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept - { - timer_queue_t &timer_queue = queue_for_clock(clock); - - if (timer_queue.is_queued(timer_id)) { - timer_queue.remove(timer_id); - } - timer_queue.deallocate(timer_id); - } - // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0. // enable: specifies whether to enable reporting of timeouts/intervals - void setTimer(timer_handle_t &timer_id, time_val &timeouttv, struct timespec &interval, + void set_timer(timer_handle_t &timer_id, time_val &timeouttv, struct timespec &interval, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept { timespec timeout = timeouttv; std::lock_guard guard(Base::lock); - timer_queue_t &timer_queue = queue_for_clock(clock); + timer_queue_t &timer_queue = this->queue_for_clock(clock); timer_t &timer = timer_for_clock(clock); auto &ts = timer_queue.node_data(timer_id); @@ -176,7 +146,7 @@ template class PosixTimerEvents : public timer_base } // Set timer relative to current time: - void setTimerRel(timer_handle_t &timer_id, const time_val &timeouttv, const time_val &intervaltv, + void set_timer_rel(timer_handle_t &timer_id, const time_val &timeouttv, const time_val &intervaltv, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept { timespec timeout = timeouttv; @@ -192,29 +162,7 @@ template class PosixTimerEvents : public timer_base curtime.tv_nsec -= 1000000000; curtime.tv_sec++; } - setTimer(timer_id, curtime, interval, enable, clock); - } - - // Enables or disabling report of timeouts (does not stop timer) - void enableTimer(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept - { - std::lock_guard guard(Base::lock); - enableTimer_nolock(timer_id, enable, clock); - } - - void enableTimer_nolock(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept - { - timer_queue_t &timer_queue = queue_for_clock(clock); - - auto &node_data = timer_queue.node_data(timer_id); - auto expiry_count = node_data.expiry_count; - if (expiry_count != 0) { - node_data.expiry_count = 0; - Base::receiveTimerExpiry(timer_id, node_data.userdata, expiry_count); - } - else { - timer_queue.node_data(timer_id).enabled = enable; - } + set_timer(timer_id, curtime, interval, enable, clock); } void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept @@ -225,7 +173,7 @@ template class PosixTimerEvents : public timer_base void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept { - timer_queue_t &timer_queue = queue_for_clock(clock); + timer_queue_t &timer_queue = this->queue_for_clock(clock); timer_t &timer = timer_for_clock(clock); if (timer_queue.is_queued(timer_id)) { @@ -237,20 +185,7 @@ template class PosixTimerEvents : public timer_base } } - void get_time(timeval &tv, clock_type clock, bool force_update) noexcept - { - timespec ts; - get_time(ts, clock, force_update); - tv = ts; - } - - void get_time(timespec &ts, clock_type clock, bool force_update) noexcept - { - int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME; - clock_gettime(posix_clock_id, &ts); - } - - ~PosixTimerEvents() + ~posix_timer_events() { timer_delete(mono_timer); timer_delete(real_timer); diff --git a/src/dasynq/dasynq-svec.h b/src/dasynq/dasynq-svec.h deleted file mode 100644 index 32131d2..0000000 --- a/src/dasynq/dasynq-svec.h +++ /dev/null @@ -1,184 +0,0 @@ -#ifndef DASYNQ_SVEC_H_INCLUDED -#define DASYNQ_SVEC_H_INCLUDED - -#include -#include -#include -#include - -// Vector with possibility to shrink capacity arbitrarily - -namespace dasynq { - -template -class svector -{ - private: - T * array; - size_t size_v; - size_t capacity_v; - - void check_capacity() - { - if (size_v == capacity_v) { - // double capacity now: - if (capacity_v == 0) capacity_v = 1; - T * new_array = (T *) std::malloc(capacity_v * 2 * sizeof(T)); - if (new_array == nullptr) { - throw std::bad_alloc(); - } - for (size_t i = 0; i < size_v; i++) { - new (&new_array[i]) T(std::move(array[i])); - array[i].T::~T(); - } - std::free(array); - array = new_array; - capacity_v *= 2; - } - } - - public: - using size_type = size_t; - - svector() : array(nullptr), size_v(0), capacity_v(0) - { - - } - - svector(const svector &other) - { - capacity_v = other.size_v; - size_v = other.size_v; - array = (T *) std::malloc(capacity_v * sizeof(T)); - if (array == nullptr) { - throw std::bad_alloc(); - } - for (size_t i = 0; i < size_v; i++) { - new (&array[i]) T(other[i]); - } - } - - ~svector() - { - for (size_t i = 0; i < size_v; i++) { - array[i].T::~T(); - } - std::free(array); - } - - void push_back(const T &t) - { - check_capacity(); - new (&array[size_v]) T(t); - size_v++; - } - - void push_back(T &&t) - { - check_capacity(); - new (&array[size_v]) T(t); - size_v++; - } - - template - void emplace_back(U... args) - { - check_capacity(); - new (&array[size_v]) T(args...); - size_v++; - } - - void pop_back() - { - size_v--; - } - - T &operator[](size_t index) - { - return array[index]; - } - - const T &operator[](size_t index) const - { - return array[index]; - } - - size_t size() const - { - return size_v; - } - - size_t capacity() const - { - return capacity_v; - } - - bool empty() const - { - return size_v == 0; - } - - void reserve(size_t amount) - { - if (capacity_v < amount) { - T * new_array = (T *) std::malloc(amount * sizeof(T)); - if (new_array == nullptr) { - throw std::bad_alloc(); - } - for (size_t i = 0; i < size_v; i++) { - new (&new_array[i]) T(std::move(array[i])); - array[i].T::~T(); - } - std::free(array); - array = new_array; - capacity_v = amount; - } - } - - void shrink_to(size_t amount) - { - if (capacity_v > amount) { - T * new_array = (T *) std::malloc(amount * sizeof(T)); - if (new_array == nullptr) { - return; - } - for (size_t i = 0; i < size_v; i++) { - new (&new_array[i]) T(std::move(array[i])); - array[i].T::~T(); - } - std::free(array); - array = new_array; - capacity_v = amount; - } - } - - T &back() - { - return array[size_v - 1]; - } - - T* begin() - { - return array; - } - - const T *begin() const - { - return array; - } - - T* end() - { - return array + size_v; - } - - const T *end() const - { - return array + size_v; - } -}; - - -} // namespace - -#endif diff --git a/src/dasynq/dasynq-timerbase.h b/src/dasynq/dasynq-timerbase.h index 87ae20e..3e013dc 100644 --- a/src/dasynq/dasynq-timerbase.h +++ b/src/dasynq/dasynq-timerbase.h @@ -2,6 +2,7 @@ #define DASYNQ_TIMERBASE_H_INCLUDED #include +#include #include "dasynq-naryheap.h" @@ -16,40 +17,45 @@ class time_val using second_t = decltype(time.tv_sec); using nsecond_t = decltype(time.tv_nsec); - time_val() + time_val() noexcept { // uninitialised! } - time_val(const struct timespec &t) + time_val(const struct timespec &t) noexcept { time = t; } - time_val(second_t s, nsecond_t ns) + time_val(second_t s, nsecond_t ns) noexcept { time.tv_sec = s; time.tv_nsec = ns; } - second_t seconds() const { return time.tv_sec; } - nsecond_t nseconds() const { return time.tv_nsec; } + second_t seconds() const noexcept{ return time.tv_sec; } + nsecond_t nseconds() const noexcept { return time.tv_nsec; } - second_t & seconds() { return time.tv_sec; } - nsecond_t & nseconds() { return time.tv_nsec; } + second_t & seconds() noexcept { return time.tv_sec; } + nsecond_t & nseconds() noexcept { return time.tv_nsec; } - //void set_seconds(second_t s) { time.tv_sec = s; } - //void set_nseconds(nsecond_t ns) { time.tv_nsec = ns; } - //void dec_seconds() { time.tv_sec--; } - //void inc_seconds() { time.tv_sec++; } + timespec & get_timespec() noexcept + { + return time; + } + + const timespec & get_timespec() const noexcept + { + return time; + } - operator timespec() const + operator timespec() const noexcept { return time; } }; -inline time_val operator-(const time_val &t1, const time_val &t2) +inline time_val operator-(const time_val &t1, const time_val &t2) noexcept { time_val diff; diff.seconds() = t1.seconds() - t2.seconds(); @@ -63,7 +69,7 @@ inline time_val operator-(const time_val &t1, const time_val &t2) return diff; } -inline time_val operator+(const time_val &t1, const time_val &t2) +inline time_val operator+(const time_val &t1, const time_val &t2) noexcept { auto ns = t1.nseconds() + t2.nseconds(); auto s = t1.seconds() + t2.seconds(); @@ -75,7 +81,7 @@ inline time_val operator+(const time_val &t1, const time_val &t2) return time_val(s, ns); } -inline time_val &operator+=(time_val &t1, const time_val &t2) +inline time_val &operator+=(time_val &t1, const time_val &t2) noexcept { auto nsum = t1.nseconds() + t2.nseconds(); t1.seconds() = t1.seconds() + t2.seconds(); @@ -87,28 +93,88 @@ inline time_val &operator+=(time_val &t1, const time_val &t2) return t1; } -inline bool operator<(const time_val &t1, const time_val &t2) +inline time_val &operator-=(time_val &t1, const time_val &t2) noexcept +{ + time_val diff; + t1.seconds() = t1.seconds() - t2.seconds(); + if (t1.nseconds() >= t2.nseconds()) { + t1.nseconds() = t1.nseconds() - t2.nseconds(); + } + else { + t1.nseconds() = 1000000000 - t2.nseconds() + t1.nseconds(); + t1.seconds()--; + } + return t1; +} + +inline bool operator<(const time_val &t1, const time_val &t2) noexcept { if (t1.seconds() < t2.seconds()) return true; if (t1.seconds() == t2.seconds() && t1.nseconds() < t2.nseconds()) return true; return false; } -inline bool operator==(const time_val &t1, const time_val &t2) +inline bool operator==(const time_val &t1, const time_val &t2) noexcept { return (t1.seconds() == t2.seconds() && t1.nseconds() == t2.nseconds()); } -inline bool operator<=(const time_val &t1, const time_val &t2) +inline bool operator<=(const time_val &t1, const time_val &t2) noexcept { if (t1.seconds() < t2.seconds()) return true; if (t1.seconds() == t2.seconds() && t1.nseconds() <= t2.nseconds()) return true; return false; } -inline bool operator!=(const time_val &t1, const time_val &t2) { return !(t1 == t2); } -inline bool operator>(const time_val &t1, const time_val &t2) { return t2 < t1; } -inline bool operator>=(const time_val &t1, const time_val &t2) { return t2 <= t1; } +inline bool operator!=(const time_val &t1, const time_val &t2) noexcept { return !(t1 == t2); } +inline bool operator>(const time_val &t1, const time_val &t2) noexcept { return t2 < t1; } +inline bool operator>=(const time_val &t1, const time_val &t2) noexcept { return t2 <= t1; } + +static inline int divide_timespec(const struct timespec &num, const struct timespec &den, struct timespec &rem) noexcept; + +inline int operator/(const time_val &t1, const time_val &t2) noexcept +{ + struct timespec remainder; + return divide_timespec(t1.get_timespec(), t2.get_timespec(), remainder); +} + +inline time_val & operator<<=(time_val &t, int n) noexcept +{ + for (int i = 0; i < n; i++) { + t.seconds() *= 2; + t.nseconds() *= 2; + if (t.nseconds() >= 1000000000) { + t.nseconds() -= 1000000000; + t.seconds()++; + } + } + return t; +} + +inline time_val operator<<(time_val &t, int n) noexcept +{ + auto r = t; + r <<= n; + return r; +} + +inline time_val & operator>>=(time_val &t, int n) noexcept +{ + for (int i = 0; i < n; i++) { + bool low = t.seconds() & 1; + t.nseconds() /= 2; + t.nseconds() += low ? 500000000ULL : 0; + t.seconds() /= 2; + } + return t; +} + +inline time_val operator>>(time_val &t, int n) noexcept +{ + auto r = t; + r >>= n; + return r; +} // Data corresponding to a single timer class timer_data @@ -119,7 +185,7 @@ class timer_data bool enabled; // whether timer reports events void *userdata; - timer_data(void *udata = nullptr) : interval_time(0,0), expiry_count(0), enabled(true), userdata(udata) + timer_data(void *udata = nullptr) noexcept : interval_time(0,0), expiry_count(0), enabled(true), userdata(udata) { // constructor } @@ -128,7 +194,7 @@ class timer_data class compare_timespec { public: - bool operator()(const struct timespec &a, const struct timespec &b) + bool operator()(const struct timespec &a, const struct timespec &b) noexcept { if (a.tv_sec < b.tv_sec) { return true; @@ -142,7 +208,7 @@ class compare_timespec } }; -using timer_queue_t = NaryHeap; +using timer_queue_t = NaryHeap; using timer_handle_t = timer_queue_t::handle_t; static inline void init_timer_handle(timer_handle_t &hnd) noexcept @@ -150,7 +216,7 @@ static inline void init_timer_handle(timer_handle_t &hnd) noexcept timer_queue_t::init_handle(hnd); } -static inline int divide_timespec(const struct timespec &num, const struct timespec &den, struct timespec &rem) +static inline int divide_timespec(const struct timespec &num, const struct timespec &den, struct timespec &rem) noexcept { if (num.tv_sec < den.tv_sec) { rem = num; @@ -176,74 +242,86 @@ static inline int divide_timespec(const struct timespec &num, const struct times // At this point, num.tv_sec >= 1. - auto &r_sec = rem.tv_sec; - auto &r_nsec = rem.tv_nsec; - r_sec = num.tv_sec; - r_nsec = num.tv_nsec; - auto d_sec = den.tv_sec; - auto d_nsec = den.tv_nsec; + time_val n = { num.tv_sec, num.tv_nsec }; + time_val d = { den.tv_sec, den.tv_nsec }; + time_val r = n; - r_sec -= d_sec; - if (r_nsec >= d_nsec) { - r_nsec -= d_nsec; - } - else { - r_nsec += (1000000000ULL - d_nsec); - r_sec -= 1; - } + // starting with numerator, subtract 1*denominator + r -= d; // Check now for common case: one timer expiry with no overrun - if (r_sec < d_sec || (r_sec == d_sec && r_nsec < d_nsec)) { + if (r < d) { + rem = r; return 1; } int nval = 1; int rval = 1; // we have subtracted 1*D already - // shift denominator until it is greater than/equal to numerator: - while (d_sec < r_sec) { - d_sec *= 2; - d_nsec *= 2; - if (d_nsec >= 1000000000) { - d_nsec -= 1000000000; - d_sec++; - } + // shift denominator until it is greater than / roughly equal to numerator: + while (d.seconds() < r.seconds()) { + d <<= 1; nval *= 2; } while (nval > 0) { - if (d_sec < r_sec || (d_sec == r_sec && d_nsec <= r_nsec)) { - // subtract: - r_sec -= d_sec; - if (d_nsec > r_nsec) { - r_nsec += 1000000000; - r_sec--; - } - r_nsec -= d_nsec; - + if (d <= r) { + r -= d; rval += nval; } - bool low = d_sec & 1; - d_nsec /= 2; - d_nsec += low ? 500000000ULL : 0; - d_sec /= 2; + d >>= 1; nval /= 2; } + rem = r; return rval; } template class timer_base : public Base { + private: + timer_queue_t timer_queue; + +#if defined(CLOCK_MONOTONIC) + timer_queue_t mono_timer_queue; + protected: + inline timer_queue_t &queue_for_clock(clock_type clock) + { + if (clock == clock_type::MONOTONIC) { + return mono_timer_queue; + } + else { + return timer_queue; + } + } + + inline bool timer_queues_empty() + { + return timer_queue.empty() && mono_timer_queue.empty(); + } +#else + protected: + inline timer_queue_t &queue_for_clock(clock_type clock) + { + return timer_queue; + } + + inline bool timer_queues_empty() + { + return timer_queue.empty(); + } +#endif - void process_timer_queue(timer_queue_t &queue, const struct timespec &curtime) + // For the specified timer queue, issue expirations for all timers set to expire on or before the given + // time (curtime). The timer queue must not be empty. + void process_timer_queue(timer_queue_t &queue, const struct timespec &curtime) noexcept { // Peek timer queue; calculate difference between current time and timeout - const struct timespec * timeout = &queue.get_root_priority(); - while (timeout->tv_sec < curtime.tv_sec || (timeout->tv_sec == curtime.tv_sec && - timeout->tv_nsec <= curtime.tv_nsec)) { + const time_val * timeout = &queue.get_root_priority(); + time_val curtime_tv = curtime; + while (*timeout <= curtime_tv) { auto & thandle = queue.get_root(); timer_data &data = queue.node_data(thandle); time_val &interval = data.interval_time; @@ -255,7 +333,7 @@ template class timer_base : public Base data.enabled = false; int expiry_count = data.expiry_count; data.expiry_count = 0; - Base::receiveTimerExpiry(thandle, data.userdata, expiry_count); + Base::receive_timer_expiry(thandle, data.userdata, expiry_count); } if (queue.empty()) { break; @@ -279,7 +357,7 @@ template class timer_base : public Base data.enabled = false; int expiry_count = data.expiry_count; data.expiry_count = 0; - Base::receiveTimerExpiry(thandle, data.userdata, expiry_count); + Base::receive_timer_expiry(thandle, data.userdata, expiry_count); } } @@ -287,6 +365,71 @@ template class timer_base : public Base timeout = &queue.get_root_priority(); } } + + public: + + void get_time(time_val &tv, clock_type clock, bool force_update) noexcept + { + get_time(tv.get_timespec(), clock, force_update); + } + +#ifdef CLOCK_MONOTONIC + void get_time(timespec &ts, clock_type clock, bool force_update) noexcept + { + clockid_t posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME; + clock_gettime(posix_clock_id, &ts); + } +#else + // If CLOCK_MONOTONIC is not defined, assume we only have gettimeofday(): + void get_time(timespec &ts, clock_type clock, bool force_update) noexcept + { + struct timeval curtime_tv; + gettimeofday(&curtime_tv, nullptr); + ts.tv_sec = curtime_tv.tv_sec; + ts.tv_nsec = curtime_tv.tv_usec * 1000; + } +#endif + + void add_timer_nolock(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC) + { + this->queue_for_clock(clock).allocate(h, userdata); + } + + void remove_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept + { + std::lock_guard guard(Base::lock); + remove_timer_nolock(timer_id, clock); + } + + void remove_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept + { + auto &timer_queue = this->queue_for_clock(clock); + if (timer_queue.is_queued(timer_id)) { + timer_queue.remove(timer_id); + } + timer_queue.deallocate(timer_id); + } + + // Enables or disabling report of timeouts (does not stop timer) + void enable_timer(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept + { + std::lock_guard guard(Base::lock); + enable_timer_nolock(timer_id, enable, clock); + } + + void enable_timer_nolock(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept + { + auto &timer_queue = this->queue_for_clock(clock); + auto &node_data = timer_queue.node_data(timer_id); + auto expiry_count = node_data.expiry_count; + if (expiry_count != 0 && enable) { + node_data.expiry_count = 0; + Base::receive_timer_expiry(timer_id, node_data.userdata, expiry_count); + } + else { + timer_queue.node_data(timer_id).enabled = enable; + } + } }; } diff --git a/src/dasynq/dasynq-timerfd.h b/src/dasynq/dasynq-timerfd.h index 032c5da..025bbba 100644 --- a/src/dasynq/dasynq-timerfd.h +++ b/src/dasynq/dasynq-timerfd.h @@ -8,6 +8,8 @@ namespace dasynq { +// Timer implementation based on Linux's "timerfd". + // We could use one timerfd per timer, but then we need to differentiate timer // descriptors from regular file descriptors when events are reported by the loop // mechanism so that we can correctly report a timer event or fd event. @@ -17,14 +19,11 @@ namespace dasynq { // we are given a handle; we need to use this to modify the watch. We delegate the // process of allocating a handle to a priority heap implementation (BinaryHeap). -template class TimerFdEvents : public timer_base +template class timer_fd_events : public timer_base { private: int timerfd_fd = -1; int systemtime_fd = -1; - - timer_queue_t timer_queue; - timer_queue_t wallclock_queue; // Set the timerfd timeout to match the first timer in the queue (disable the timerfd // if there are no active timers). @@ -42,8 +41,9 @@ template class TimerFdEvents : public timer_base timerfd_settime(fd, TFD_TIMER_ABSTIME, &newtime, nullptr); } - void process_timer(clock_type clock, int fd, timer_queue_t &queue) noexcept + void process_timer(clock_type clock, int fd) noexcept { + timer_queue_t &queue = this->queue_for_clock(clock); struct timespec curtime; switch (clock) { case clock_type::SYSTEM: @@ -62,7 +62,7 @@ template class TimerFdEvents : public timer_base set_timer_from_queue(fd, queue); } - void setTimer(timer_handle_t & timer_id, const time_val &timeouttv, const time_val &intervaltv, + void set_timer(timer_handle_t & timer_id, const time_val &timeouttv, const time_val &intervaltv, timer_queue_t &queue, int fd, bool enable) noexcept { timespec timeout = timeouttv; @@ -88,30 +88,24 @@ template class TimerFdEvents : public timer_base } } - timer_queue_t & get_queue(clock_type clock) + public: + + class traits_t : public Base::traits_t { - switch(clock) { - case clock_type::SYSTEM: - return wallclock_queue; - case clock_type::MONOTONIC: - return timer_queue; - default: - DASYNQ_UNREACHABLE; - } - } + constexpr static bool full_timer_support = true; + }; - public: template - void receiveFdEvent(T &loop_mech, typename Base::FD_r fd_r, void * userdata, int flags) + void receive_fd_event(T &loop_mech, typename traits_t::fd_r fd_r_a, void * userdata, int flags) { if (userdata == &timerfd_fd) { - process_timer(clock_type::MONOTONIC, timerfd_fd, timer_queue); + process_timer(clock_type::MONOTONIC, timerfd_fd); } else if (userdata == &systemtime_fd) { - process_timer(clock_type::SYSTEM, systemtime_fd, wallclock_queue); + process_timer(clock_type::SYSTEM, systemtime_fd); } else { - Base::receiveFdEvent(loop_mech, fd_r, userdata, flags); + Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags); } } @@ -128,8 +122,8 @@ template class TimerFdEvents : public timer_base } try { - loop_mech->addFdWatch(timerfd_fd, &timerfd_fd, IN_EVENTS); - loop_mech->addFdWatch(systemtime_fd, &systemtime_fd, IN_EVENTS); + loop_mech->add_fd_watch(timerfd_fd, &timerfd_fd, IN_EVENTS); + loop_mech->add_fd_watch(systemtime_fd, &systemtime_fd, IN_EVENTS); Base::init(loop_mech); } catch (...) { @@ -139,27 +133,6 @@ template class TimerFdEvents : public timer_base } } - // Add timer, store into given handle - void addTimer(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC) - { - std::lock_guard guard(Base::lock); - timer_queue_t & queue = get_queue(clock); - queue.allocate(h, userdata); - } - - void removeTimer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept - { - std::lock_guard guard(Base::lock); - removeTimer_nolock(timer_id, clock); - } - - void removeTimer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept - { - stop_timer_nolock(timer_id, clock); - timer_queue_t & queue = get_queue(clock); - queue.deallocate(timer_id); - } - void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept { std::lock_guard guard(Base::lock); @@ -168,10 +141,10 @@ template class TimerFdEvents : public timer_base void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept { - timer_queue_t & queue = get_queue(clock); + timer_queue_t & queue = this->queue_for_clock(clock); int fd = (clock == clock_type::MONOTONIC) ? timerfd_fd : systemtime_fd; if (queue.is_queued(timer_id)) { - bool was_first = (&timer_queue.get_root()) == &timer_id; + bool was_first = (&queue.get_root()) == &timer_id; queue.remove(timer_id); if (was_first) { set_timer_from_queue(fd, queue); @@ -181,18 +154,19 @@ template class TimerFdEvents : public timer_base // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0. // enable: specifies whether to enable reporting of timeouts/intervals - void setTimer(timer_handle_t & timer_id, const time_val &timeouttv, const time_val &intervaltv, + void set_timer(timer_handle_t & timer_id, const time_val &timeouttv, const time_val &intervaltv, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept { timespec timeout = timeouttv; timespec interval = intervaltv; + timer_queue_t queue = this->queue_for_clock(clock); switch (clock) { case clock_type::SYSTEM: - setTimer(timer_id, timeout, interval, wallclock_queue, systemtime_fd, enable); + set_timer(timer_id, timeout, interval, queue, systemtime_fd, enable); break; case clock_type::MONOTONIC: - setTimer(timer_id, timeout, interval, timer_queue, timerfd_fd, enable); + set_timer(timer_id, timeout, interval, queue, timerfd_fd, enable); break; default: DASYNQ_UNREACHABLE; @@ -200,73 +174,17 @@ template class TimerFdEvents : public timer_base } // Set timer relative to current time: - void setTimerRel(timer_handle_t & timer_id, const time_val &timeouttv, const time_val &intervaltv, + void set_timer_rel(timer_handle_t & timer_id, const time_val &timeout, const time_val &interval, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept { - timespec timeout = timeouttv; - timespec interval = intervaltv; - - clockid_t sclock; - switch (clock) { - case clock_type::SYSTEM: - sclock = CLOCK_REALTIME; - break; - case clock_type::MONOTONIC: - sclock = CLOCK_MONOTONIC; - break; - default: - DASYNQ_UNREACHABLE; - } + time_val alarmtime; + this->get_time(alarmtime, clock, false); + alarmtime += timeout; - // TODO consider caching current time somehow; need to decide then when to update cached value. - struct timespec curtime; - clock_gettime(sclock, &curtime); - curtime.tv_sec += timeout.tv_sec; - curtime.tv_nsec += timeout.tv_nsec; - if (curtime.tv_nsec > 1000000000) { - curtime.tv_nsec -= 1000000000; - curtime.tv_sec++; - } - - setTimer(timer_id, curtime, interval, enable, clock); - } - - // Enables or disabling report of timeouts (does not stop timer) - void enableTimer(timer_handle_t & timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept - { - std::lock_guard guard(Base::lock); - enableTimer_nolock(timer_id, enable, clock); + set_timer(timer_id, alarmtime, interval, enable, clock); } - void enableTimer_nolock(timer_handle_t & timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept - { - timer_queue_t & queue = get_queue(clock); - - auto &node_data = queue.node_data(timer_id); - auto expiry_count = node_data.expiry_count; - if (expiry_count != 0) { - node_data.expiry_count = 0; - Base::receiveTimerExpiry(timer_id, node_data.userdata, expiry_count); - } - else { - queue.node_data(timer_id).enabled = enable; - } - } - - void get_time(time_val &tv, clock_type clock, bool force_update) noexcept - { - timespec ts; - get_time(ts, clock, force_update); - tv = ts; - } - - void get_time(timespec &ts, clock_type clock, bool force_update) noexcept - { - int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME; - clock_gettime(posix_clock_id, &ts); - } - - ~TimerFdEvents() + ~timer_fd_events() { close(timerfd_fd); close(systemtime_fd); diff --git a/src/dasynq/dasynq-util.h b/src/dasynq/dasynq-util.h new file mode 100644 index 0000000..b14c81d --- /dev/null +++ b/src/dasynq/dasynq-util.h @@ -0,0 +1,41 @@ +#ifndef DASYNQ_UTIL_H_INCLUDED +#define DASYNQ_UTIL_H_INCLUDED 1 + +#include +#include + +namespace dasynq { + +// Define pipe2, if it's not present in the sytem library. pipe2 is like pipe with an additional flags +// argument which can set file/descriptor flags atomically. The emulated version that we generate cannot +// do this atomically, of course. + +#if DASYNQ_HAVE_PIPE2 == 0 +inline int pipe2(int filedes[2], int flags) +{ + if (pipe(filedes) == -1) { + return -1; + } + + if (flags & O_CLOEXEC) { + fcntl(filedes[0], F_SETFD, FD_CLOEXEC); + fcntl(filedes[1], F_SETFD, FD_CLOEXEC); + } + + if (flags & O_NONBLOCK) { + fcntl(filedes[0], F_SETFL, O_NONBLOCK); + fcntl(filedes[1], F_SETFL, O_NONBLOCK); + } + + return 0; +} +#else + +using ::pipe2; + +#endif + + +} + +#endif diff --git a/src/dasynq/dasynq.h b/src/dasynq/dasynq.h index bff5a0d..c471fa4 100644 --- a/src/dasynq/dasynq.h +++ b/src/dasynq/dasynq.h @@ -6,6 +6,7 @@ #include "dasynq-flags.h" #include "dasynq-naryheap.h" #include "dasynq-interrupt.h" +#include "dasynq-util.h" // Dasynq uses a "mix-in" pattern to produce an event loop implementation incorporating selectable implementations of // various components (main backend, timers, child process watch mechanism etc). In C++ this can be achieved by @@ -16,7 +17,7 @@ // We can chain several such components together (and so so below) to "mix in" the functionality of each into the final // class, eg: // -// template using Loop = EpollLoop>>>; +// template using Loop = epoll_loop>>>; // // (which defines an alias template "Loop", whose implementation will use the epoll backend, a standard interrupt channel // implementation, a timerfd-based timer implementation, and the standard child process watch implementation). @@ -31,39 +32,39 @@ // Base::init(derived); // } // -// At the base all this is the EventDispatch class, defined below, which receives event notifications and inserts +// At the base all this is the event_dispatch class, defined below, which receives event notifications and inserts // them into a queue for processing. The event_loop class, also below, wraps this (via composition) in an interface // which can be used to register/de-regsiter/enable/disable event watchers, and which can process the queued events // by calling the watcher callbacks. The event_loop class also provides some synchronisation to ensure thread-safety. -#if defined(DASYNQ_CUSTOM_LOOP_IMPLEMENTATION) -// Loop and LoopTraits defined already; used for testing -#elif defined(DASYNQ_HAVE_KQUEUE) +#if DASYNQ_HAVE_KQUEUE #include "dasynq-kqueue.h" #if _POSIX_TIMERS > 0 #include "dasynq-posixtimer.h" namespace dasynq { - template using TimerEvents = PosixTimerEvents; + template using timer_events = posix_timer_events; } #else #include "dasynq-itimer.h" namespace dasynq { - template using TimerEvents = ITimerEvents; + template using timer_events = itimer_events; } #endif #include "dasynq-childproc.h" namespace dasynq { - template using Loop = KqueueLoop>>>; - using LoopTraits = KqueueTraits; + template using loop_t = kqueue_loop>>>; + using loop_traits_t = kqueue_traits; } -#elif defined(DASYNQ_HAVE_EPOLL) +#elif DASYNQ_HAVE_EPOLL #include "dasynq-epoll.h" #include "dasynq-timerfd.h" #include "dasynq-childproc.h" namespace dasynq { - template using Loop = EpollLoop>>>; - using LoopTraits = EpollTraits; + template using loop_t = epoll_loop>>>; + using loop_traits_t = epoll_traits; } +#else +#error No loop backened defined - see dasynq-config.h #endif #include @@ -77,38 +78,9 @@ namespace dasynq { #include "dasynq-mutex.h" -namespace dasynq { - -#if HAVE_PIPE2 == 0 -inline int pipe2(int filedes[2], int flags) -{ - if (pipe(filedes) == -1) { - return -1; - } +#include "dasynq-basewatchers.h" - if (flags & O_CLOEXEC) { - fcntl(filedes[0], F_SETFD, FD_CLOEXEC); - fcntl(filedes[1], F_SETFD, FD_CLOEXEC); - } - - if (flags & O_NONBLOCK) { - fcntl(filedes[0], F_SETFL, O_NONBLOCK); - fcntl(filedes[1], F_SETFL, O_NONBLOCK); - } - - return 0; -} -#endif - -namespace dprivate { - class base_watcher; -} - -using PrioQueue = NaryHeap; - -inline namespace { - constexpr int DEFAULT_PRIORITY = 50; -} +namespace dasynq { /** * Values for rearm/disarm return from event handlers @@ -129,195 +101,7 @@ enum class rearm REQUEUE }; -// Forward declarations: -template class Loop = dasynq::Loop, typename LoopTraits = dasynq::LoopTraits> -class event_loop; - namespace dprivate { - // (non-public API) - - template class fd_watcher; - template class bidi_fd_watcher; - template class signal_watcher; - template class child_proc_watcher; - template class timer; - - template class fd_watcher_impl; - template class bidi_fd_watcher_impl; - template class signal_watcher_impl; - template class child_proc_watcher_impl; - template class timer_impl; - - enum class watch_type_t - { - SIGNAL, - FD, - CHILD, - SECONDARYFD, - TIMER - }; - - template class EventDispatch; - - // For FD watchers: - // Use this watch flag to indicate that in and out events should be reported separately, - // that is, watcher should not be disabled until all watched event types are queued. - constexpr static int multi_watch = 4; - - // Represents a queued event notification. Various event watchers derive from this type. - class base_watcher - { - template friend class EventDispatch; - template class, typename> friend class dasynq::event_loop; - friend inline void basewatcher_set_active(base_watcher &watcher, bool active); - friend inline bool basewatcher_get_deleteme(const base_watcher &watcher); - friend inline bool basewatcher_get_emulatefd(const base_watcher &watcher); - - protected: - watch_type_t watchType; - int active : 1; // currently executing handler? - int deleteme : 1; // delete when handler finished? - int emulatefd : 1; // emulate file watch (by re-queueing) - int emulate_enabled : 1; // whether an emulated watch is enabled - - PrioQueue::handle_t heap_handle; - int priority; - - static void set_priority(base_watcher &p, int prio) - { - p.priority = prio; - } - - public: - - // Perform initialisation necessary before registration with an event loop - void init() - { - active = false; - deleteme = false; - emulatefd = false; - emulate_enabled = false; - PrioQueue::init_handle(heap_handle); - priority = DEFAULT_PRIORITY; - } - - base_watcher(watch_type_t wt) noexcept : watchType(wt) { } - - virtual void dispatch(void *loop_ptr) noexcept { }; - virtual void dispatch_second(void *loop_ptr) noexcept { } - - virtual ~base_watcher() noexcept { } - - // Called when the watcher has been removed. - // It is guaranteed by the caller that: - // - the dispatch method is not currently running - // - the dispatch method will not be called. - virtual void watch_removed() noexcept - { - // TODO this "delete" behaviour could be dependent on a flag, perhaps? - // delete this; - } - }; - - inline void basewatcher_set_active(base_watcher &watcher, bool active) - { - watcher.active = active; - } - - inline bool basewatcher_get_deleteme(const base_watcher &watcher) - { - return watcher.deleteme; - } - - inline bool basewatcher_get_emulatefd(const base_watcher &watcher) - { - return watcher.emulatefd; - } - - // Base signal event - not part of public API - template - class base_signal_watcher : public base_watcher - { - friend class EventDispatch; - template class, typename> friend class dasynq::event_loop; - - protected: - typename Traits::SigInfo siginfo; - base_signal_watcher() : base_watcher(watch_type_t::SIGNAL) { } - - public: - using siginfo_t = typename Traits::SigInfo; - typedef siginfo_t &siginfo_p; - }; - - template - class base_fd_watcher : public base_watcher - { - template friend class EventDispatch; - template class, typename> friend class dasynq::event_loop; - - protected: - int watch_fd; - - // These flags are protected by the loop's internal lock: - int watch_flags; // events being watched - int event_flags; // events pending (queued) - - // watch_flags: for a regular fd_watcher, this specifies the events that the watcher - // is watching (or was watching if disabled). For a bidi_fd_watcher, specifies - // the events that the watcher is currently watching (i.e. specifies which - // halves of the Bidi watcher are enabled). - - base_fd_watcher() noexcept : base_watcher(watch_type_t::FD) { } - }; - - template - class base_bidi_fd_watcher : public base_fd_watcher - { - template friend class EventDispatch; - template class, typename> friend class dasynq::event_loop; - - protected: - - // The main instance is the "input" watcher only; we keep a secondary watcher - // with a secondary set of flags for the "output" watcher: - base_watcher outWatcher {watch_type_t::SECONDARYFD}; - - int read_removed : 1; // read watch removed? - int write_removed : 1; // write watch removed? - }; - - template - class base_child_watcher : public base_watcher - { - template friend class EventDispatch; - template class, typename> friend class dasynq::event_loop; - - protected: - pid_watch_handle_t watch_handle; - pid_t watch_pid; - int child_status; - - base_child_watcher() : base_watcher(watch_type_t::CHILD) { } - }; - - - template - class base_timer_watcher : public base_watcher - { - template friend class EventDispatch; - template class, typename> friend class dasynq::event_loop; - - protected: - timer_handle_t timer_handle; - int intervals; - clock_type clock; - - base_timer_watcher() : base_watcher(watch_type_t::TIMER) - { - init_timer_handle(timer_handle); - } - }; // Classes for implementing a fair(ish) wait queue. // A queue node can be signalled when it reaches the head of @@ -329,15 +113,15 @@ namespace dprivate { // Select an appropriate condition variable type for a mutex: // condition_variable if mutex is std::mutex, or condition_variable_any // otherwise. - template class condvarSelector; + template class condvar_selector; - template <> class condvarSelector + template <> class condvar_selector { public: typedef std::condition_variable condvar; }; - template class condvarSelector + template class condvar_selector { public: typedef std::condition_variable_any condvar; @@ -357,9 +141,11 @@ namespace dprivate { template class waitqueue_node { - typename condvarSelector::condvar condvar; + typename condvar_selector::condvar condvar; friend class waitqueue; - waitqueue_node * next = nullptr; + + // ptr to next node in queue, set to null when added to queue tail: + waitqueue_node * next; public: void signal() @@ -376,22 +162,23 @@ namespace dprivate { template <> class waitqueue { public: + // remove current head of queue, return new head: waitqueue_node * unqueue() { return nullptr; } - waitqueue_node * getHead() + waitqueue_node * get_head() { return nullptr; } - bool checkHead(waitqueue_node &node) + bool check_head(waitqueue_node &node) { return true; } - bool isEmpty() + bool is_empty() { return true; } @@ -407,6 +194,7 @@ namespace dprivate { waitqueue_node * head = nullptr; public: + // remove current head of queue, return new head: waitqueue_node * unqueue() { head = head->next; @@ -416,23 +204,24 @@ namespace dprivate { return head; } - waitqueue_node * getHead() + waitqueue_node * get_head() { return head; } - bool checkHead(waitqueue_node &node) + bool check_head(waitqueue_node &node) { return head == &node; } - bool isEmpty() + bool is_empty() { return head == nullptr; } void queue(waitqueue_node *node) { + node->next = nullptr; if (tail) { tail->next = node; } @@ -445,53 +234,59 @@ namespace dprivate { // Do standard post-dispatch processing for a watcher. This handles the case of removing or // re-queing watchers depending on the rearm type. - template void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearmType) + template void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearm_type) { - if (rearmType == rearm::REMOVE) { - loop.getBaseLock().unlock(); + if (rearm_type == rearm::REMOVE) { + loop.get_base_lock().unlock(); watcher->watch_removed(); - loop.getBaseLock().lock(); + loop.get_base_lock().lock(); } - else if (rearmType == rearm::REQUEUE) { + else if (rearm_type == rearm::REQUEUE) { loop.requeue_watcher(watcher); } } // This class serves as the base class (mixin) for the AEN mechanism class. // - // The EventDispatch class maintains the queued event data structures. It inserts watchers - // into the queue when events are received (receiveXXX methods). - template class EventDispatch : public Traits + // The event_dispatch class maintains the queued event data structures. It inserts watchers + // into the queue when events are received (receiveXXX methods). It also owns a mutex used + // to protect those structures. + // + // In general the methods should be called with lock held. In practice this means that the + // event loop backend implementations must obtain the lock; they are also free to use it to + // protect their own internal data structures. + template class event_dispatch { - template class, typename> friend class dasynq::event_loop; + friend class dasynq::event_loop;; + + public: + using mutex_t = T_Mutex; + using traits_t = Traits; + + private: // queue data structure/pointer - PrioQueue event_queue; + prio_queue event_queue; - using BaseSignalWatcher = dasynq::dprivate::base_signal_watcher; - using BaseFdWatcher = dasynq::dprivate::base_fd_watcher; - using BaseBidiFdWatcher = dasynq::dprivate::base_bidi_fd_watcher; - using BaseChildWatcher = dasynq::dprivate::base_child_watcher; - using BaseTimerWatcher = dasynq::dprivate::base_timer_watcher; + using base_signal_watcher = dasynq::dprivate::base_signal_watcher; + using base_fd_watcher = dasynq::dprivate::base_fd_watcher; + using base_bidi_fd_watcher = dasynq::dprivate::base_bidi_fd_watcher; + using base_child_watcher = dasynq::dprivate::base_child_watcher; + using base_timer_watcher = dasynq::dprivate::base_timer_watcher; - // Add a watcher into the queuing system (but don't queue it) + // Add a watcher into the queueing system (but don't queue it). Call with lock held. // may throw: std::bad_alloc void prepare_watcher(base_watcher *bwatcher) { event_queue.allocate(bwatcher->heap_handle, bwatcher); } - void queueWatcher(base_watcher *bwatcher) noexcept + void queue_watcher(base_watcher *bwatcher) noexcept { event_queue.insert(bwatcher->heap_handle, bwatcher->priority); } - bool isQueued(base_watcher *bwatcher) noexcept - { - return event_queue.is_queued(bwatcher->heap_handle); - } - - void dequeueWatcher(base_watcher *bwatcher) noexcept + void dequeue_watcher(base_watcher *bwatcher) noexcept { if (event_queue.is_queued(bwatcher->heap_handle)) { event_queue.remove(bwatcher->heap_handle); @@ -505,24 +300,30 @@ namespace dprivate { } protected: - T_Mutex lock; + mutex_t lock; template void init(T *loop) noexcept { } - // Receive a signal; return true to disable signal watch or false to leave enabled + void sigmaskf(int how, const sigset_t *set, sigset_t *oset) + { + LoopTraits::sigmaskf(how, set, oset); + } + + // Receive a signal; return true to disable signal watch or false to leave enabled. + // Called with lock held. template - bool receive_signal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata) noexcept + bool receive_signal(T &loop_mech, typename Traits::sigdata_t & siginfo, void * userdata) noexcept { - BaseSignalWatcher * bwatcher = static_cast(userdata); + base_signal_watcher * bwatcher = static_cast(userdata); bwatcher->siginfo = siginfo; - queueWatcher(bwatcher); + queue_watcher(bwatcher); return true; } template - void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags) noexcept + void receive_fd_event(T &loop_mech, typename Traits::fd_r fd_r, void * userdata, int flags) noexcept { - BaseFdWatcher * bfdw = static_cast(userdata); + base_fd_watcher * bfdw = static_cast(userdata); bfdw->event_flags |= flags; @@ -530,49 +331,52 @@ namespace dprivate { bool is_multi_watch = bfdw->watch_flags & multi_watch; if (is_multi_watch) { - BaseBidiFdWatcher *bbdw = static_cast(bwatcher); + base_bidi_fd_watcher *bbdw = static_cast(bwatcher); bbdw->watch_flags &= ~flags; if ((flags & IN_EVENTS) && (flags & OUT_EVENTS)) { // Queue the secondary watcher first: - queueWatcher(&bbdw->outWatcher); + queue_watcher(&bbdw->out_watcher); } else if (flags & OUT_EVENTS) { // Use the secondary watcher for queueing: - bwatcher = &(bbdw->outWatcher); + bwatcher = &(bbdw->out_watcher); } } - queueWatcher(bwatcher); + queue_watcher(bwatcher); - if (! LoopTraits::has_separate_rw_fd_watches) { + if (! traits_t::has_separate_rw_fd_watches) { // If this is a bidirectional fd-watch, it has been disabled in *both* directions // as the event was delivered. However, the other direction should not be disabled // yet, so we need to re-enable: int in_out_mask = IN_EVENTS | OUT_EVENTS; if (is_multi_watch && (bfdw->watch_flags & in_out_mask) != 0) { // We need to re-enable the other channel now: - loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata, + loop_mech.enable_fd_watch_nolock(bfdw->watch_fd, userdata, (bfdw->watch_flags & in_out_mask) | ONE_SHOT); } } } - void receiveChildStat(pid_t child, int status, void * userdata) noexcept + // Child process terminated. Called with both the main lock and the reaper lock held. + void receive_child_stat(pid_t child, int status, void * userdata) noexcept { - BaseChildWatcher * watcher = static_cast(userdata); + base_child_watcher * watcher = static_cast(userdata); watcher->child_status = status; - queueWatcher(watcher); + watcher->child_termd = true; + queue_watcher(watcher); } - void receiveTimerExpiry(timer_handle_t & timer_handle, void * userdata, int intervals) noexcept + void receive_timer_expiry(timer_handle_t & timer_handle, void * userdata, int intervals) noexcept { - BaseTimerWatcher * watcher = static_cast(userdata); - watcher->intervals = intervals; - queueWatcher(watcher); + base_timer_watcher * watcher = static_cast(userdata); + watcher->intervals += intervals; + queue_watcher(watcher); } // Pull a single event from the queue; returns nullptr if the queue is empty. - base_watcher * pullEvent() noexcept + // Call with lock held. + base_watcher * pull_event() noexcept { if (event_queue.empty()) { return nullptr; @@ -584,7 +388,9 @@ namespace dprivate { return r; } - void issueDelete(base_watcher *watcher) noexcept + // Queue a watcher for reomval, or issue "removed" callback to it. + // Call with lock free. + void issue_delete(base_watcher *watcher) noexcept { // This is only called when the attention lock is held, so if the watcher is not // active/queued now, it cannot become active (and will not be reported with an event) @@ -601,7 +407,7 @@ namespace dprivate { } else { // Actually do the delete. - dequeueWatcher(watcher); + dequeue_watcher(watcher); release_watcher(watcher); lock.unlock(); @@ -609,7 +415,9 @@ namespace dprivate { } } - void issueDelete(BaseBidiFdWatcher *watcher) noexcept + // Queue a watcher for reomval, or issue "removed" callback to it. + // Call with lock free. + void issue_delete(base_bidi_fd_watcher *watcher) noexcept { lock.lock(); @@ -618,18 +426,18 @@ namespace dprivate { release_watcher(watcher); } else { - dequeueWatcher(watcher); + dequeue_watcher(watcher); release_watcher(watcher); watcher->read_removed = true; } - base_watcher *secondary = &(watcher->outWatcher); + base_watcher *secondary = &(watcher->out_watcher); if (secondary->active) { secondary->deleteme = true; release_watcher(watcher); } else { - dequeueWatcher(secondary); + dequeue_watcher(secondary); release_watcher(watcher); watcher->write_removed = true; } @@ -642,17 +450,16 @@ namespace dprivate { lock.unlock(); } } - - public: - using mutex_t = T_Mutex; }; } - -template class Loop, typename LoopTraits> +// This is the main event_loop implementation. It serves as an interface to the event loop +// backend (of which it maintains an internal instance). It also serialises waits the backend +// and provides safe deletion of watchers (see comments inline). +template class event_loop { - using my_event_loop_t = event_loop; + using my_event_loop_t = event_loop; friend class dprivate::fd_watcher; friend class dprivate::bidi_fd_watcher; friend class dprivate::signal_watcher; @@ -660,7 +467,7 @@ class event_loop friend class dprivate::timer; friend void dprivate::post_dispatch(my_event_loop_t &loop, - dprivate::base_watcher *watcher, rearm rearmType); + dprivate::base_watcher *watcher, rearm rearm_type); template friend class dprivate::fd_watcher_impl; template friend class dprivate::bidi_fd_watcher_impl; @@ -668,23 +475,29 @@ class event_loop template friend class dprivate::child_proc_watcher_impl; template friend class dprivate::timer_impl; + using backend_traits_t = typename Traits::backend_traits_t; + + template using event_dispatch = dprivate::event_dispatch; + using loop_mech_t = typename Traits::template backend_t>; + using reaper_mutex_t = typename loop_mech_t::reaper_mutex_t; + public: - using loop_traits_t = LoopTraits; - using time_val = dasynq::time_val; + using traits_t = Traits; + using loop_traits_t = typename loop_mech_t::traits_t; + using mutex_t = T_Mutex; private: - template using EventDispatch = dprivate::EventDispatch; template using waitqueue = dprivate::waitqueue; template using waitqueue_node = dprivate::waitqueue_node; - using BaseWatcher = dprivate::base_watcher; - using BaseSignalWatcher = dprivate::base_signal_watcher; - using BaseFdWatcher = dprivate::base_fd_watcher; - using BaseBidiFdWatcher = dprivate::base_bidi_fd_watcher; - using BaseChildWatcher = dprivate::base_child_watcher; - using BaseTimerWatcher = dprivate::base_timer_watcher; + using base_watcher = dprivate::base_watcher; + using base_signal_watcher = dprivate::base_signal_watcher; + using base_fd_watcher = dprivate::base_fd_watcher; + using base_bidi_fd_watcher = dprivate::base_bidi_fd_watcher; + using base_child_watcher = dprivate::base_child_watcher; + using base_timer_watcher = dprivate::base_timer_watcher; using watch_type_t = dprivate::watch_type_t; - - Loop> loop_mech; + + loop_mech_t loop_mech; // There is a complex problem with most asynchronous event notification mechanisms // when used in a multi-threaded environment. Generally, a file descriptor or other @@ -729,21 +542,29 @@ class event_loop // - The mutex only protects manipulation of the wait queues, and so should not // be highly contended. - T_Mutex wait_lock; // wait lock, used to prevent multiple threads from waiting + mutex_t wait_lock; // wait lock, used to prevent multiple threads from waiting // on the event queue simultaneously. - waitqueue attn_waitqueue; - waitqueue wait_waitqueue; + waitqueue attn_waitqueue; + waitqueue wait_waitqueue; - T_Mutex &getBaseLock() noexcept + mutex_t &get_base_lock() noexcept { return loop_mech.lock; } - void registerSignal(BaseSignalWatcher *callBack, int signo) + reaper_mutex_t &get_reaper_lock() noexcept + { + return loop_mech.get_reaper_lock(); + } + + void register_signal(base_signal_watcher *callBack, int signo) { + auto & ed = (event_dispatch &) loop_mech; + std::lock_guard guard(ed.lock); + loop_mech.prepare_watcher(callBack); try { - loop_mech.addSignalWatch(signo, callBack); + loop_mech.add_signal_watch_nolock(signo, callBack); } catch (...) { loop_mech.release_watcher(callBack); @@ -751,24 +572,28 @@ class event_loop } } - void deregister(BaseSignalWatcher *callBack, int signo) noexcept + void deregister(base_signal_watcher *callBack, int signo) noexcept { - loop_mech.removeSignalWatch(signo); + loop_mech.remove_signal_watch(signo); waitqueue_node qnode; get_attn_lock(qnode); - EventDispatch & ed = (EventDispatch &) loop_mech; - ed.issueDelete(callBack); + auto & ed = (event_dispatch &) loop_mech; + ed.issue_delete(callBack); release_lock(qnode); } - void registerFd(BaseFdWatcher *callback, int fd, int eventmask, bool enabled, bool emulate = false) + void register_fd(base_fd_watcher *callback, int fd, int eventmask, bool enabled, bool emulate = false) { + auto & ed = (event_dispatch &) loop_mech; + std::lock_guard guard(ed.lock); + loop_mech.prepare_watcher(callback); + try { - if (! loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled, emulate)) { + if (! loop_mech.add_fd_watch(fd, callback, eventmask | ONE_SHOT, enabled, emulate)) { callback->emulatefd = true; callback->emulate_enabled = enabled; if (enabled) { @@ -785,14 +610,17 @@ class event_loop } } - void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask, bool emulate = false) + void register_fd(base_bidi_fd_watcher *callback, int fd, int eventmask, bool emulate = false) { + auto & ed = (event_dispatch &) loop_mech; + std::lock_guard guard(ed.lock); + loop_mech.prepare_watcher(callback); try { - loop_mech.prepare_watcher(&callback->outWatcher); + loop_mech.prepare_watcher(&callback->out_watcher); try { - if (LoopTraits::has_separate_rw_fd_watches) { - int r = loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT, emulate); + if (backend_traits_t::has_separate_rw_fd_watches) { + int r = loop_mech.add_bidi_fd_watch(fd, callback, eventmask | ONE_SHOT, emulate); if (r & IN_EVENTS) { callback->emulatefd = true; if (eventmask & IN_EVENTS) { @@ -800,27 +628,27 @@ class event_loop } } if (r & OUT_EVENTS) { - callback->outWatcher.emulatefd = true; + callback->out_watcher.emulatefd = true; if (eventmask & OUT_EVENTS) { - requeue_watcher(&callback->outWatcher); + requeue_watcher(&callback->out_watcher); } } } else { - if (! loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, true, emulate)) { + if (! loop_mech.add_fd_watch(fd, callback, eventmask | ONE_SHOT, true, emulate)) { callback->emulatefd = true; - callback->outWatcher.emulatefd = true; + callback->out_watcher.emulatefd = true; if (eventmask & IN_EVENTS) { requeue_watcher(callback); } if (eventmask & OUT_EVENTS) { - requeue_watcher(&callback->outWatcher); + requeue_watcher(&callback->out_watcher); } } } } catch (...) { - loop_mech.release_watcher(&callback->outWatcher); + loop_mech.release_watcher(&callback->out_watcher); throw; } } @@ -830,68 +658,71 @@ class event_loop } } - void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) noexcept + void set_fd_enabled(base_watcher *watcher, int fd, int watch_flags, bool enabled) noexcept { if (enabled) { - loop_mech.enableFdWatch(fd, watcher, watch_flags | ONE_SHOT); + loop_mech.enable_fd_watch(fd, watcher, watch_flags | ONE_SHOT); } else { - loop_mech.disableFdWatch(fd, watch_flags); + loop_mech.disable_fd_watch(fd, watch_flags); } } - void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) noexcept + void set_fd_enabled_nolock(base_watcher *watcher, int fd, int watch_flags, bool enabled) noexcept { if (enabled) { - loop_mech.enableFdWatch_nolock(fd, watcher, watch_flags | ONE_SHOT); + loop_mech.enable_fd_watch_nolock(fd, watcher, watch_flags | ONE_SHOT); } else { - loop_mech.disableFdWatch_nolock(fd, watch_flags); + loop_mech.disable_fd_watch_nolock(fd, watch_flags); } } - void deregister(BaseFdWatcher *callback, int fd) noexcept + void deregister(base_fd_watcher *callback, int fd) noexcept { if (callback->emulatefd) { - auto & ed = (EventDispatch &) loop_mech; - ed.issueDelete(callback); + auto & ed = (event_dispatch &) loop_mech; + ed.issue_delete(callback); return; } - loop_mech.removeFdWatch(fd, callback->watch_flags); + loop_mech.remove_fd_watch(fd, callback->watch_flags); waitqueue_node qnode; get_attn_lock(qnode); - auto & ed = (EventDispatch &) loop_mech; - ed.issueDelete(callback); + auto & ed = (event_dispatch &) loop_mech; + ed.issue_delete(callback); release_lock(qnode); } - void deregister(BaseBidiFdWatcher *callback, int fd) noexcept + void deregister(base_bidi_fd_watcher *callback, int fd) noexcept { - if (LoopTraits::has_separate_rw_fd_watches) { - loop_mech.removeBidiFdWatch(fd); + if (backend_traits_t::has_separate_rw_fd_watches) { + loop_mech.remove_bidi_fd_watch(fd); } else { - loop_mech.removeFdWatch(fd, callback->watch_flags); + loop_mech.remove_fd_watch(fd, callback->watch_flags); } waitqueue_node qnode; get_attn_lock(qnode); - EventDispatch & ed = (EventDispatch &) loop_mech; - ed.issueDelete(callback); + event_dispatch & ed = (event_dispatch &) loop_mech; + ed.issue_delete(callback); release_lock(qnode); } - void reserveChildWatch(BaseChildWatcher *callback) + void reserve_child_watch(base_child_watcher *callback) { + auto & ed = (event_dispatch &) loop_mech; + std::lock_guard guard(ed.lock); + loop_mech.prepare_watcher(callback); try { - loop_mech.reserveChildWatch(callback->watch_handle); + loop_mech.reserve_child_watch_nolock(callback->watch_handle); } catch (...) { loop_mech.release_watcher(callback); @@ -899,17 +730,23 @@ class event_loop } } - void unreserve(BaseChildWatcher *callback) noexcept + void unreserve(base_child_watcher *callback) noexcept { - loop_mech.unreserveChildWatch(callback->watch_handle); + auto & ed = (event_dispatch &) loop_mech; + std::lock_guard guard(ed.lock); + + loop_mech.unreserve_child_watch(callback->watch_handle); loop_mech.release_watcher(callback); } - void registerChild(BaseChildWatcher *callback, pid_t child) + void register_child(base_child_watcher *callback, pid_t child) { + auto & ed = (event_dispatch &) loop_mech; + std::lock_guard guard(ed.lock); + loop_mech.prepare_watcher(callback); try { - loop_mech.addChildWatch(callback->watch_handle, child, callback); + loop_mech.add_child_watch_nolock(callback->watch_handle, child, callback); } catch (...) { loop_mech.release_watcher(callback); @@ -917,97 +754,121 @@ class event_loop } } - void registerReservedChild(BaseChildWatcher *callback, pid_t child) noexcept + void register_reserved_child(base_child_watcher *callback, pid_t child) noexcept { - loop_mech.addReservedChildWatch(callback->watch_handle, child, callback); + loop_mech.add_reserved_child_watch(callback->watch_handle, child, callback); } - void registerReservedChild_nolock(BaseChildWatcher *callback, pid_t child) noexcept + void register_reserved_child_nolock(base_child_watcher *callback, pid_t child) noexcept { - loop_mech.addReservedChildWatch_nolock(callback->watch_handle, child, callback); + loop_mech.add_reserved_child_watch_nolock(callback->watch_handle, child, callback); } - void deregister(BaseChildWatcher *callback, pid_t child) noexcept + void deregister(base_child_watcher *callback, pid_t child) noexcept { - loop_mech.removeChildWatch(callback->watch_handle); + loop_mech.remove_child_watch(callback->watch_handle); waitqueue_node qnode; get_attn_lock(qnode); - EventDispatch & ed = (EventDispatch &) loop_mech; - ed.issueDelete(callback); + event_dispatch & ed = (event_dispatch &) loop_mech; + ed.issue_delete(callback); release_lock(qnode); } // Stop watching a child process, but retain watch reservation so that another child can be // watched without running into resource allocation issues. - void stop_watch(BaseChildWatcher *callback) noexcept + void stop_watch(base_child_watcher *callback) noexcept { loop_mech.stop_child_watch(callback->watch_handle); } - void registerTimer(BaseTimerWatcher *callback, clock_type clock) + void register_timer(base_timer_watcher *callback, clock_type clock) { + auto & ed = (event_dispatch &) loop_mech; + std::lock_guard guard(ed.lock); + loop_mech.prepare_watcher(callback); try { - loop_mech.addTimer(callback->timer_handle, callback, clock); + loop_mech.add_timer_nolock(callback->timer_handle, callback, clock); } catch (...) { loop_mech.release_watcher(callback); } } - void setTimer(BaseTimerWatcher *callBack, const timespec &timeout, clock_type clock) noexcept + void set_timer(base_timer_watcher *callBack, const timespec &timeout, clock_type clock) noexcept { struct timespec interval {0, 0}; - loop_mech.setTimer(callBack->timer_handle, timeout, interval, true, clock); + loop_mech.set_timer(callBack->timer_handle, timeout, interval, true, clock); } - void setTimer(BaseTimerWatcher *callBack, const timespec &timeout, const timespec &interval, + void set_timer(base_timer_watcher *callBack, const timespec &timeout, const timespec &interval, clock_type clock) noexcept { - loop_mech.setTimer(callBack->timer_handle, timeout, interval, true, clock); + loop_mech.set_timer(callBack->timer_handle, timeout, interval, true, clock); } - void setTimerRel(BaseTimerWatcher *callBack, const timespec &timeout, clock_type clock) noexcept + void set_timer_rel(base_timer_watcher *callBack, const timespec &timeout, clock_type clock) noexcept { struct timespec interval {0, 0}; - loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true, clock); + loop_mech.set_timer_rel(callBack->timer_handle, timeout, interval, true, clock); } - void setTimerRel(BaseTimerWatcher *callBack, const timespec &timeout, + void set_timer_rel(base_timer_watcher *callBack, const timespec &timeout, const timespec &interval, clock_type clock) noexcept { - loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true, clock); + loop_mech.set_timer_rel(callBack->timer_handle, timeout, interval, true, clock); } - void stop_timer(BaseTimerWatcher *callback, clock_type clock) noexcept + void set_timer_enabled(base_timer_watcher *callback, clock_type clock, bool enabled) noexcept + { + loop_mech.enable_timer(callback->timer_handle, enabled, clock); + } + + void set_timer_enabled_nolock(base_timer_watcher *callback, clock_type clock, bool enabled) noexcept + { + loop_mech.enable_timer_nolock(callback->timer_handle, enabled, clock); + } + + void stop_timer(base_timer_watcher *callback, clock_type clock) noexcept { loop_mech.stop_timer(callback->timer_handle, clock); } - void deregister(BaseTimerWatcher *callback, clock_type clock) noexcept + void deregister(base_timer_watcher *callback, clock_type clock) noexcept { - loop_mech.removeTimer(callback->timer_handle, clock); + loop_mech.remove_timer(callback->timer_handle, clock); waitqueue_node qnode; get_attn_lock(qnode); - EventDispatch & ed = (EventDispatch &) loop_mech; - ed.issueDelete(callback); + event_dispatch & ed = (event_dispatch &) loop_mech; + ed.issue_delete(callback); release_lock(qnode); } - void dequeue_watcher(BaseWatcher *watcher) noexcept + void dequeue_watcher(base_watcher *watcher) noexcept { - loop_mech.dequeueWatcher(watcher); + loop_mech.dequeue_watcher(watcher); } - void requeue_watcher(BaseWatcher *watcher) noexcept + void requeue_watcher(base_watcher *watcher) noexcept { - loop_mech.queueWatcher(watcher); + loop_mech.queue_watcher(watcher); + + // We need to signal any thread that is currently waiting on the loop mechanism, so that it wakes + // and processes the newly queued watcher: + + wait_lock.lock(); + bool attn_q_empty = attn_waitqueue.is_empty(); + wait_lock.unlock(); + + if (! attn_q_empty) { + loop_mech.interrupt_wait(); + } } // Acquire the attention lock (when held, ensures that no thread is polling the AEN @@ -1017,9 +878,9 @@ class event_loop { std::unique_lock ulock(wait_lock); attn_waitqueue.queue(&qnode); - if (! attn_waitqueue.checkHead(qnode)) { + if (! attn_waitqueue.check_head(qnode)) { loop_mech.interrupt_wait(); - while (! attn_waitqueue.checkHead(qnode)) { + while (! attn_waitqueue.check_head(qnode)) { qnode.wait(ulock); } } @@ -1032,7 +893,7 @@ class event_loop void get_pollwait_lock(waitqueue_node &qnode) noexcept { std::unique_lock ulock(wait_lock); - if (attn_waitqueue.isEmpty()) { + if (attn_waitqueue.is_empty()) { // Queue is completely empty: attn_waitqueue.queue(&qnode); } @@ -1040,7 +901,7 @@ class event_loop wait_waitqueue.queue(&qnode); } - while (! attn_waitqueue.checkHead(qnode)) { + while (! attn_waitqueue.check_head(qnode)) { qnode.wait(ulock); } } @@ -1054,8 +915,8 @@ class event_loop nhead->signal(); } else { - if (! wait_waitqueue.isEmpty()) { - auto nhead = wait_waitqueue.getHead(); + if (! wait_waitqueue.is_empty()) { + auto nhead = wait_waitqueue.get_head(); wait_waitqueue.unqueue(); attn_waitqueue.queue(nhead); nhead->signal(); @@ -1063,34 +924,34 @@ class event_loop } } - void processSignalRearm(BaseSignalWatcher * bsw, rearm rearmType) noexcept + void process_signal_rearm(base_signal_watcher * bsw, rearm rearm_type) noexcept { // Called with lock held - if (rearmType == rearm::REARM) { - loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo()); + if (rearm_type == rearm::REARM) { + loop_mech.rearm_signal_watch_nolock(bsw->siginfo.get_signo(), bsw); } - else if (rearmType == rearm::REMOVE) { - loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo()); + else if (rearm_type == rearm::REMOVE) { + loop_mech.remove_signal_watch_nolock(bsw->siginfo.get_signo()); } // Note that signal watchers cannot (currently) be disarmed } // Process rearm return for fd_watcher, including the primary watcher of a bidi_fd_watcher - rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch) noexcept + rearm process_fd_rearm(base_fd_watcher * bfw, rearm rearm_type, bool is_multi_watch) noexcept { - bool emulatedfd = static_cast(bfw)->emulatefd; + bool emulatedfd = static_cast(bfw)->emulatefd; // Called with lock held if (is_multi_watch) { - BaseBidiFdWatcher * bdfw = static_cast(bfw); + base_bidi_fd_watcher * bdfw = static_cast(bfw); - if (rearmType == rearm::REMOVE) { + if (rearm_type == rearm::REMOVE) { bdfw->read_removed = 1; - if (LoopTraits::has_separate_rw_fd_watches) { + if (backend_traits_t::has_separate_rw_fd_watches) { bdfw->watch_flags &= ~IN_EVENTS; if (! emulatedfd) { - loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS); + loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS); } return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP; } @@ -1099,7 +960,7 @@ class event_loop if (bdfw->watch_flags & IN_EVENTS) { bdfw->watch_flags &= ~IN_EVENTS; if (! emulatedfd) { - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags); + loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags); } } return rearm::NOOP; @@ -1107,226 +968,233 @@ class event_loop else { // both removed: actually remove if (! emulatedfd) { - loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */); + loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, 0 /* not used */); } return rearm::REMOVE; } } } - else if (rearmType == rearm::DISARM) { + else if (rearm_type == rearm::DISARM) { bdfw->watch_flags &= ~IN_EVENTS; if (! emulatedfd) { - if (! LoopTraits::has_separate_rw_fd_watches) { + if (! backend_traits_t::has_separate_rw_fd_watches) { int watch_flags = bdfw->watch_flags; - // without separate r/w watches, enableFdWatch actually sets + // without separate r/w watches, enable_fd_watch actually sets // which sides are enabled (i.e. can be used to disable): - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), + loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, + static_cast(bdfw), (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); } else { - loop_mech.disableFdWatch_nolock(bdfw->watch_fd, IN_EVENTS); + loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS); } } } - else if (rearmType == rearm::REARM) { + else if (rearm_type == rearm::REARM) { bdfw->watch_flags |= IN_EVENTS; if (! emulatedfd) { - if (! LoopTraits::has_separate_rw_fd_watches) { + if (! backend_traits_t::has_separate_rw_fd_watches) { int watch_flags = bdfw->watch_flags; - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), + loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, + static_cast(bdfw), (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); } else { - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), + loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, + static_cast(bdfw), IN_EVENTS | ONE_SHOT); } } else { - rearmType = rearm::REQUEUE; + rearm_type = rearm::REQUEUE; } } - else if (rearmType == rearm::NOOP) { + else if (rearm_type == rearm::NOOP) { if (bdfw->emulatefd) { if (bdfw->watch_flags & IN_EVENTS) { - rearmType = rearm::REQUEUE; + rearm_type = rearm::REQUEUE; } } } - return rearmType; + return rearm_type; } else { // Not multi-watch: if (emulatedfd) { - if (rearmType == rearm::REARM) { + if (rearm_type == rearm::REARM) { bfw->emulate_enabled = true; - rearmType = rearm::REQUEUE; + rearm_type = rearm::REQUEUE; } - else if (rearmType == rearm::DISARM) { + else if (rearm_type == rearm::DISARM) { bfw->emulate_enabled = false; } - else if (rearmType == rearm::NOOP) { + else if (rearm_type == rearm::NOOP) { if (bfw->emulate_enabled) { - rearmType = rearm::REQUEUE; + rearm_type = rearm::REQUEUE; } } } - else if (rearmType == rearm::REARM) { - loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw, + else if (rearm_type == rearm::REARM) { + loop_mech.enable_fd_watch_nolock(bfw->watch_fd, bfw, (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); } - else if (rearmType == rearm::DISARM) { - loop_mech.disableFdWatch_nolock(bfw->watch_fd, bfw->watch_flags); + else if (rearm_type == rearm::DISARM) { + loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags); } - else if (rearmType == rearm::REMOVE) { - loop_mech.removeFdWatch_nolock(bfw->watch_fd, bfw->watch_flags); + else if (rearm_type == rearm::REMOVE) { + loop_mech.remove_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags); } - return rearmType; + return rearm_type; } } // Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher. - rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, BaseWatcher * outw, rearm rearmType) noexcept + rearm process_secondary_rearm(base_bidi_fd_watcher * bdfw, base_watcher * outw, rearm rearm_type) noexcept { bool emulatedfd = outw->emulatefd; // Called with lock held if (emulatedfd) { - if (rearmType == rearm::REMOVE) { + if (rearm_type == rearm::REMOVE) { bdfw->write_removed = 1; bdfw->watch_flags &= ~OUT_EVENTS; - rearmType = bdfw->read_removed ? rearm::REMOVE : rearm::NOOP; + rearm_type = bdfw->read_removed ? rearm::REMOVE : rearm::NOOP; } - else if (rearmType == rearm::DISARM) { + else if (rearm_type == rearm::DISARM) { bdfw->watch_flags &= ~OUT_EVENTS; } - else if (rearmType == rearm::REARM) { + else if (rearm_type == rearm::REARM) { bdfw->watch_flags |= OUT_EVENTS; - rearmType = rearm::REQUEUE; + rearm_type = rearm::REQUEUE; } - else if (rearmType == rearm::NOOP) { + else if (rearm_type == rearm::NOOP) { if (bdfw->watch_flags & OUT_EVENTS) { - rearmType = rearm::REQUEUE; + rearm_type = rearm::REQUEUE; } } - return rearmType; + return rearm_type; } - else if (rearmType == rearm::REMOVE) { + else if (rearm_type == rearm::REMOVE) { bdfw->write_removed = 1; - if (LoopTraits::has_separate_rw_fd_watches) { + if (backend_traits_t::has_separate_rw_fd_watches) { bdfw->watch_flags &= ~OUT_EVENTS; - loop_mech.removeFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS); + loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, OUT_EVENTS); return bdfw->read_removed ? rearm::REMOVE : rearm::NOOP; } else { if (! bdfw->read_removed) { if (bdfw->watch_flags & OUT_EVENTS) { bdfw->watch_flags &= ~OUT_EVENTS; - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags); + loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags); } return rearm::NOOP; } else { // both removed: actually remove - loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */); + loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, 0 /* not used */); return rearm::REMOVE; } } } - else if (rearmType == rearm::DISARM) { + else if (rearm_type == rearm::DISARM) { bdfw->watch_flags &= ~OUT_EVENTS; - if (! LoopTraits::has_separate_rw_fd_watches) { + if (! backend_traits_t::has_separate_rw_fd_watches) { int watch_flags = bdfw->watch_flags; - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), + loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, + static_cast(bdfw), (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); } else { - loop_mech.disableFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS); + loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, OUT_EVENTS); } } - else if (rearmType == rearm::REARM) { + else if (rearm_type == rearm::REARM) { bdfw->watch_flags |= OUT_EVENTS; - if (! LoopTraits::has_separate_rw_fd_watches) { + if (! backend_traits_t::has_separate_rw_fd_watches) { int watch_flags = bdfw->watch_flags; - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), + loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, + static_cast(bdfw), (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); } else { - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), + loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, + static_cast(bdfw), OUT_EVENTS | ONE_SHOT); } } - return rearmType; + return rearm_type; } - void process_child_watch_rearm(BaseChildWatcher *bcw, rearm rearm_type) noexcept + void process_child_watch_rearm(base_child_watcher *bcw, rearm rearm_type) noexcept { if (rearm_type == rearm::REMOVE || rearm_type == rearm::DISARM) { - loop_mech.unreserveChildWatch_nolock(bcw->watch_handle); + loop_mech.unreserve_child_watch_nolock(bcw->watch_handle); } } - void processTimerRearm(BaseTimerWatcher *btw, rearm rearmType) noexcept + void process_timer_rearm(base_timer_watcher *btw, rearm rearm_type) noexcept { // Called with lock held - if (rearmType == rearm::REARM) { - loop_mech.enableTimer_nolock(btw->timer_handle, true, btw->clock); + if (rearm_type == rearm::REARM) { + loop_mech.enable_timer_nolock(btw->timer_handle, true, btw->clock); } - else if (rearmType == rearm::REMOVE) { - loop_mech.removeTimer_nolock(btw->timer_handle, btw->clock); + else if (rearm_type == rearm::REMOVE) { + loop_mech.remove_timer_nolock(btw->timer_handle, btw->clock); } - else if (rearmType == rearm::DISARM) { - loop_mech.enableTimer_nolock(btw->timer_handle, false, btw->clock); + else if (rearm_type == rearm::DISARM) { + loop_mech.enable_timer_nolock(btw->timer_handle, false, btw->clock); } } - // Process all queued events; returns true if any events were processed. - bool processEvents() noexcept + // Process queued events; returns true if any events were processed. + // limit - maximum number of events to process before returning; -1 for + // no limit. + bool process_events(int limit) noexcept { - EventDispatch & ed = (EventDispatch &) loop_mech; + auto & ed = (event_dispatch &) loop_mech; ed.lock.lock(); // So this pulls *all* currently pending events and processes them in the current thread. // That's probably good for throughput, but maybe the behaviour should be configurable. - BaseWatcher * pqueue = ed.pullEvent(); + base_watcher * pqueue = ed.pull_event(); bool active = false; - while (pqueue != nullptr) { + while (pqueue != nullptr && limit != 0) { pqueue->active = true; active = true; - BaseBidiFdWatcher *bbfw = nullptr; + base_bidi_fd_watcher *bbfw = nullptr; // (Above variables are initialised only to silence compiler warnings). if (pqueue->watchType == watch_type_t::SECONDARYFD) { - // construct a pointer to the main watcher: + // construct a pointer to the main watcher (using char* arithmetic): char * rp = (char *)pqueue; + + // Here we take the offset of a member from a non-standard-layout class, which is + // specified to have undefined result by the C++ language standard, but which + // in practice works fine: _Pragma ("GCC diagnostic push") _Pragma ("GCC diagnostic ignored \"-Winvalid-offsetof\"") - rp -= offsetof(BaseBidiFdWatcher, outWatcher); + rp -= offsetof(base_bidi_fd_watcher, out_watcher); _Pragma ("GCC diagnostic pop") - bbfw = (BaseBidiFdWatcher *)rp; + bbfw = (base_bidi_fd_watcher *)rp; // issue a secondary dispatch: bbfw->dispatch_second(this); - pqueue = ed.pullEvent(); + pqueue = ed.pull_event(); continue; } pqueue->dispatch(this); - pqueue = ed.pullEvent(); + pqueue = ed.pull_event(); + if (limit > 0) limit--; } ed.lock.unlock(); @@ -1334,7 +1202,6 @@ class event_loop } public: - using mutex_t = T_Mutex; using fd_watcher = dprivate::fd_watcher; using bidi_fd_watcher = dprivate::bidi_fd_watcher; @@ -1348,33 +1215,33 @@ class event_loop template using child_proc_watcher_impl = dprivate::child_proc_watcher_impl; template using timer_impl = dprivate::timer_impl; - // Poll the event loop and process any pending events. If no events are pending, wait + // Poll the event loop and process any pending events (up to a limit). If no events are pending, wait // for and process at least one event. - void run() noexcept + void run(int limit = -1) noexcept { // Poll the mechanism first, in case high-priority events are pending: waitqueue_node qnode; get_pollwait_lock(qnode); - loop_mech.pullEvents(false); + loop_mech.pull_events(false); release_lock(qnode); - while (! processEvents()) { + while (! process_events(limit)) { // Pull events from the AEN mechanism and insert them in our internal queue: get_pollwait_lock(qnode); - loop_mech.pullEvents(true); + loop_mech.pull_events(true); release_lock(qnode); } } - // Poll the event loop and process any pending events - void poll() noexcept + // Poll the event loop and process any pending events (up to a limit). + void poll(int limit = -1) noexcept { waitqueue_node qnode; get_pollwait_lock(qnode); - loop_mech.pullEvents(false); + loop_mech.pull_events(false); release_lock(qnode); - processEvents(); + process_events(limit); } // Get the current time corresponding to a specific clock. @@ -1401,50 +1268,51 @@ namespace dprivate { // Posix signal event watcher template -class signal_watcher : private dprivate::base_signal_watcher +class signal_watcher : private dprivate::base_signal_watcher { template friend class signal_watcher_impl; - using BaseWatcher = dprivate::base_watcher; + using base_watcher = dprivate::base_watcher; using T_Mutex = typename EventLoop::mutex_t; public: - using siginfo_p = typename dprivate::base_signal_watcher::siginfo_p; + using event_loop_t = EventLoop; + using siginfo_p = typename signal_watcher::siginfo_p; // Register this watcher to watch the specified signal. // If an attempt is made to register with more than one event loop at // a time, behaviour is undefined. The signal should be masked before // call. - inline void add_watch(EventLoop &eloop, int signo, int prio = DEFAULT_PRIORITY) + inline void add_watch(event_loop_t &eloop, int signo, int prio = DEFAULT_PRIORITY) { - BaseWatcher::init(); + base_watcher::init(); this->priority = prio; this->siginfo.set_signo(signo); - eloop.registerSignal(this, signo); + eloop.register_signal(this, signo); } - inline void deregister(EventLoop &eloop) noexcept + inline void deregister(event_loop_t &eloop) noexcept { eloop.deregister(this, this->siginfo.get_signo()); } template - static signal_watcher *add_watch(EventLoop &eloop, int signo, T watchHndlr) + static signal_watcher *add_watch(event_loop_t &eloop, int signo, T watch_hndlr) { - class LambdaSigWatcher : public signal_watcher_impl + class lambda_sig_watcher : public signal_watcher_impl { private: - T watchHndlr; + T watch_hndlr; public: - LambdaSigWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a) + lambda_sig_watcher(T watch_handlr_a) : watch_hndlr(watch_handlr_a) { // } - rearm received(EventLoop &eloop, int signo, siginfo_p siginfo) + rearm received(event_loop_t &eloop, int signo, siginfo_p siginfo) { - return watchHndlr(eloop, signo, siginfo); + return watch_hndlr(eloop, signo, siginfo); } void watch_removed() noexcept override @@ -1453,7 +1321,7 @@ class signal_watcher : private dprivate::base_signal_watcheradd_watch(eloop, signo); return lsw; } @@ -1467,23 +1335,23 @@ class signal_watcher_impl : public signal_watcher void dispatch(void *loop_ptr) noexcept override { EventLoop &loop = *static_cast(loop_ptr); - loop.getBaseLock().unlock(); + loop.get_base_lock().unlock(); - auto rearmType = static_cast(this)->received(loop, this->siginfo.get_signo(), this->siginfo); + auto rearm_type = static_cast(this)->received(loop, this->siginfo.get_signo(), this->siginfo); - loop.getBaseLock().lock(); + loop.get_base_lock().lock(); - if (rearmType != rearm::REMOVED) { + if (rearm_type != rearm::REMOVED) { this->active = false; if (this->deleteme) { // We don't want a watch that is marked "deleteme" to re-arm itself. - rearmType = rearm::REMOVE; + rearm_type = rearm::REMOVE; } - loop.processSignalRearm(this, rearmType); + loop.process_signal_rearm(this, rearm_type); - post_dispatch(loop, this, rearmType); + post_dispatch(loop, this, rearm_type); } } }; @@ -1494,12 +1362,12 @@ class fd_watcher : private dprivate::base_fd_watcher friend class fd_watcher_impl; - using BaseWatcher = dprivate::base_watcher; - using T_Mutex = typename EventLoop::mutex_t; + using base_watcher = dprivate::base_watcher; + using mutex_t = typename EventLoop::mutex_t; protected: - // Set the types of event to watch. Only supported if LoopTraits::has_bidi_fd_watch + // Set the types of event to watch. Only supported if loop_traits_t_t::has_bidi_fd_watch // is true; otherwise has unspecified behavior. // Only safe to call from within the callback handler (fdEvent). Might not take // effect until the current callback handler returns with REARM. @@ -1510,11 +1378,13 @@ class fd_watcher : private dprivate::base_fd_watcherpriority = prio; this->watch_fd = fd; this->watch_flags = flags; - eloop.registerFd(this, fd, flags, enabled, true); + eloop.register_fd(this, fd, flags, enabled, true); } - void add_watch_noemu(EventLoop &eloop, int fd, int flags, bool enabled = true, int prio = DEFAULT_PRIORITY) + void add_watch_noemu(event_loop_t &eloop, int fd, int flags, bool enabled = true, int prio = DEFAULT_PRIORITY) { - BaseWatcher::init(); + base_watcher::init(); this->priority = prio; this->watch_fd = fd; this->watch_flags = flags; - eloop.registerFd(this, fd, flags, enabled, false); + eloop.register_fd(this, fd, flags, enabled, false); } int get_watched_fd() @@ -1554,19 +1424,19 @@ class fd_watcher : private dprivate::base_fd_watcherwatch_fd); } - void set_enabled(EventLoop &eloop, bool enable) noexcept + void set_enabled(event_loop_t &eloop, bool enable) noexcept { - std::lock_guard guard(eloop.getBaseLock()); + std::lock_guard guard(eloop.get_base_lock()); if (this->emulatefd) { this->emulate_enabled = enable; } else { - eloop.setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable); + eloop.set_fd_enabled_nolock(this, this->watch_fd, this->watch_flags, enable); } if (! enable) { eloop.dequeue_watcher(this); @@ -1576,9 +1446,9 @@ class fd_watcher : private dprivate::base_fd_watcher - static fd_watcher *add_watch(EventLoop &eloop, int fd, int flags, T watchHndlr) + static fd_watcher *add_watch(event_loop_t &eloop, int fd, int flags, T watchHndlr) { - class lambda_fd_watcher : public fd_watcher_impl + class lambda_fd_watcher : public fd_watcher_impl { private: T watchHndlr; @@ -1589,7 +1459,7 @@ class fd_watcher : private dprivate::base_fd_watcher // In case emulating, clear enabled here; REARM or explicit set_enabled will re-enable. this->emulate_enabled = false; - loop.getBaseLock().unlock(); + loop.get_base_lock().unlock(); - auto rearmType = static_cast(this)->fd_event(loop, this->watch_fd, this->event_flags); + auto rearm_type = static_cast(this)->fd_event(loop, this->watch_fd, this->event_flags); - loop.getBaseLock().lock(); + loop.get_base_lock().lock(); - if (rearmType != rearm::REMOVED) { + if (rearm_type != rearm::REMOVED) { this->event_flags = 0; this->active = false; if (this->deleteme) { // We don't want a watch that is marked "deleteme" to re-arm itself. - rearmType = rearm::REMOVE; + rearm_type = rearm::REMOVE; } - rearmType = loop.processFdRearm(this, rearmType, false); + rearm_type = loop.process_fd_rearm(this, rearm_type, false); - post_dispatch(loop, this, rearmType); + post_dispatch(loop, this, rearm_type); } } }; @@ -1648,8 +1518,8 @@ class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher friend class bidi_fd_watcher_impl; - using BaseWatcher = dprivate::base_watcher; - using T_Mutex = typename EventLoop::mutex_t; + using base_watcher = dprivate::base_watcher; + using mutex_t = typename EventLoop::mutex_t; void set_watch_enabled(EventLoop &eloop, bool in, bool b) { @@ -1662,14 +1532,14 @@ class bidi_fd_watcher : private dprivate::base_bidi_fd_watcherwatch_flags &= ~events; } - dprivate::base_watcher * watcher = in ? this : &this->outWatcher; + dprivate::base_watcher * watcher = in ? this : &this->out_watcher; - if (! basewatcher_get_emulatefd(*watcher)) { + if (! watcher->emulatefd) { if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) { - eloop.setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b); + eloop.set_fd_enabled_nolock(this, this->watch_fd, events | ONE_SHOT, b); } else { - eloop.setFdEnabled_nolock(this, this->watch_fd, + eloop.set_fd_enabled_nolock(this, this->watch_fd, (this->watch_flags & IO_EVENTS) | ONE_SHOT, (this->watch_flags & IO_EVENTS) != 0); } @@ -1682,18 +1552,20 @@ class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher guard(eloop.getBaseLock()); - bool use_emulation = this->emulatefd || basewatcher_get_emulatefd(this->outWatcher); + std::lock_guard guard(eloop.get_base_lock()); + bool use_emulation = this->emulatefd || this->out_watcher.emulatefd; if (use_emulation || EventLoop::loop_traits_t::has_separate_rw_fd_watches) { - set_watch_enabled(eloop, true, (newFlags & IN_EVENTS) != 0); - set_watch_enabled(eloop, false, (newFlags & OUT_EVENTS) != 0); + set_watch_enabled(eloop, true, (new_flags & IN_EVENTS) != 0); + set_watch_enabled(eloop, false, (new_flags & OUT_EVENTS) != 0); } else { - this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags; - eloop.setFdEnabled((dprivate::base_watcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true); + this->watch_flags = (this->watch_flags & ~IO_EVENTS) | new_flags; + eloop.set_fd_enabled_nolock((dprivate::base_watcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true); } } @@ -1722,30 +1594,30 @@ class bidi_fd_watcher : private dprivate::base_bidi_fd_watcheroutWatcher.BaseWatcher::init(); + base_watcher::init(); + this->out_watcher.base_watcher::init(); this->watch_fd = fd; this->watch_flags = flags | dprivate::multi_watch; this->read_removed = false; this->write_removed = false; this->priority = inprio; - this->set_priority(this->outWatcher, outprio); - eloop.registerFd(this, fd, flags, true); + this->set_priority(this->out_watcher, outprio); + eloop.register_fd(this, fd, flags, true); } - void add_watch_noemu(EventLoop &eloop, int fd, int flags, int inprio = DEFAULT_PRIORITY, int outprio = DEFAULT_PRIORITY) + void add_watch_noemu(event_loop_t &eloop, int fd, int flags, int inprio = DEFAULT_PRIORITY, int outprio = DEFAULT_PRIORITY) { - BaseWatcher::init(); - this->outWatcher.BaseWatcher::init(); + base_watcher::init(); + this->out_watcher.base_watcher::init(); this->watch_fd = fd; this->watch_flags = flags | dprivate::multi_watch; this->read_removed = false; this->write_removed = false; this->priority = inprio; - this->set_priority(this->outWatcher, outprio); - eloop.registerFd(this, fd, flags, false); + this->set_priority(this->out_watcher, outprio); + eloop.register_fd(this, fd, flags, false); } int get_watched_fd() @@ -1761,33 +1633,33 @@ class bidi_fd_watcher : private dprivate::base_bidi_fd_watcherwatch_fd); } template - static bidi_fd_watcher *add_watch(EventLoop &eloop, int fd, int flags, T watchHndlr) + static bidi_fd_watcher *add_watch(event_loop_t &eloop, int fd, int flags, T watch_hndlr) { - class LambdaBidiWatcher : public bidi_fd_watcher_impl + class lambda_bidi_watcher : public bidi_fd_watcher_impl { private: - T watchHndlr; + T watch_hndlr; public: - LambdaBidiWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a) + lambda_bidi_watcher(T watch_handlr_a) : watch_hndlr(watch_handlr_a) { // } - rearm read_ready(EventLoop &eloop, int fd) + rearm read_ready(event_loop_t &eloop, int fd) { - return watchHndlr(eloop, fd, IN_EVENTS); + return watch_hndlr(eloop, fd, IN_EVENTS); } - rearm write_ready(EventLoop &eloop, int fd) + rearm write_ready(event_loop_t &eloop, int fd) { - return watchHndlr(eloop, fd, OUT_EVENTS); + return watch_hndlr(eloop, fd, OUT_EVENTS); } void watch_removed() noexcept override @@ -1796,7 +1668,7 @@ class bidi_fd_watcher : private dprivate::base_bidi_fd_watcheradd_watch(eloop, fd, flags); return lfd; } @@ -1812,52 +1684,52 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher { EventLoop &loop = *static_cast(loop_ptr); this->emulate_enabled = false; - loop.getBaseLock().unlock(); + loop.get_base_lock().unlock(); - auto rearmType = static_cast(this)->read_ready(loop, this->watch_fd); + auto rearm_type = static_cast(this)->read_ready(loop, this->watch_fd); - loop.getBaseLock().lock(); + loop.get_base_lock().lock(); - if (rearmType != rearm::REMOVED) { + if (rearm_type != rearm::REMOVED) { this->event_flags &= ~IN_EVENTS; this->active = false; if (this->deleteme) { // We don't want a watch that is marked "deleteme" to re-arm itself. - rearmType = rearm::REMOVE; + rearm_type = rearm::REMOVE; } - rearmType = loop.processFdRearm(this, rearmType, true); + rearm_type = loop.process_fd_rearm(this, rearm_type, true); - post_dispatch(loop, this, rearmType); + post_dispatch(loop, this, rearm_type); } } void dispatch_second(void *loop_ptr) noexcept override { - auto &outwatcher = bidi_fd_watcher::outWatcher; + auto &outwatcher = bidi_fd_watcher::out_watcher; EventLoop &loop = *static_cast(loop_ptr); - loop.getBaseLock().unlock(); + loop.get_base_lock().unlock(); - auto rearmType = static_cast(this)->write_ready(loop, this->watch_fd); + auto rearm_type = static_cast(this)->write_ready(loop, this->watch_fd); - loop.getBaseLock().lock(); + loop.get_base_lock().lock(); - if (rearmType != rearm::REMOVED) { + if (rearm_type != rearm::REMOVED) { this->event_flags &= ~OUT_EVENTS; - basewatcher_set_active(outwatcher, false); - if (basewatcher_get_deleteme(outwatcher)) { + outwatcher.active = false; + if (outwatcher.deleteme) { // We don't want a watch that is marked "deleteme" to re-arm itself. - rearmType = rearm::REMOVE; + rearm_type = rearm::REMOVE; } - rearmType = loop.processSecondaryRearm(this, &outwatcher, rearmType); + rearm_type = loop.process_secondary_rearm(this, &outwatcher, rearm_type); - if (rearmType == rearm::REQUEUE) { - post_dispatch(loop, &outwatcher, rearmType); + if (rearm_type == rearm::REQUEUE) { + post_dispatch(loop, &outwatcher, rearm_type); } else { - post_dispatch(loop, this, rearmType); + post_dispatch(loop, this, rearm_type); } } } @@ -1869,19 +1741,38 @@ class child_proc_watcher : private dprivate::base_child_watcher friend class child_proc_watcher_impl; - using BaseWatcher = dprivate::base_watcher; - using T_Mutex = typename EventLoop::mutex_t; + using base_watcher = dprivate::base_watcher; + using mutex_t = typename EventLoop::mutex_t; public: + + using event_loop_t = EventLoop; + + // send a signal to this process, if it is still running, in a race-free manner. + // return is as for POSIX kill(); return is -1 with errno=ESRCH if process has + // already terminated. + int send_signal(event_loop_t &loop, int signo) noexcept + { + auto reaper_mutex = loop.get_reaper_mutex(); + std::lock_guard guard(reaper_mutex); + + if (this->child_termd) { + errno = ESRCH; + return -1; + } + + return kill(this->watch_pid, signo); + } + // Reserve resources for a child watcher with the given event loop. // Reservation can fail with std::bad_alloc. Some backends do not support - // reservation (it will always fail) - check LoopTraits::supports_childwatch_reservation. - void reserve_watch(EventLoop &eloop) + // reservation (it will always fail) - check loop_traits_t::supports_childwatch_reservation. + void reserve_watch(event_loop_t &eloop) { - eloop.reserveChildWatch(this); + eloop.reserve_child_watch(this); } - void unreserve(EventLoop &eloop) + void unreserve(event_loop_t &eloop) { eloop.unreserve(this); } @@ -1890,12 +1781,12 @@ class child_proc_watcher : private dprivate::base_child_watcherwatch_pid = child; this->priority = prio; - eloop.registerChild(this, child); + eloop.register_child(this, child); } // Register a watcher for the given child process with an event loop, @@ -1904,21 +1795,21 @@ class child_proc_watcher : private dprivate::base_child_watcherwatch_pid = child; this->priority = prio; - eloop.registerReservedChild(this, child); + eloop.register_reserved_child(this, child); } - void deregister(EventLoop &eloop, pid_t child) noexcept + void deregister(event_loop_t &eloop, pid_t child) noexcept { eloop.deregister(this, child); } // Stop watching the currently watched child, but retain watch reservation. - void stop_watch(EventLoop &eloop) noexcept + void stop_watch(event_loop_t &eloop) noexcept { eloop.stop_watch(this); } @@ -1930,9 +1821,9 @@ class child_proc_watcher : private dprivate::base_child_watcherpriority = prio; if (EventLoop::loop_traits_t::supports_childwatch_reservation) { @@ -1941,7 +1832,7 @@ class child_proc_watcher : private dprivate::base_child_watcherwatch_pid = child; - eloop.registerReservedChild_nolock(this, child); + eloop.register_reserved_child_nolock(this, child); lock.unlock(); return child; } @@ -1970,7 +1861,7 @@ class child_proc_watcher : private dprivate::base_child_watcher guard(eloop.getBaseLock()); + std::lock_guard guard(eloop.get_base_lock()); pid_t child = ::fork(); if (child == -1) { @@ -1998,7 +1889,7 @@ class child_proc_watcher : private dprivate::base_child_watcherwatch_pid = child; - eloop.registerChild(this, child); + eloop.register_child(this, child); // Continue in child (it doesn't matter what is written): write(pipefds[1], &pipefds, sizeof(int)); @@ -2022,24 +1913,24 @@ class child_proc_watcher_impl : public child_proc_watcher void dispatch(void *loop_ptr) noexcept override { EventLoop &loop = *static_cast(loop_ptr); - loop.getBaseLock().unlock(); + loop.get_base_lock().unlock(); - auto rearmType = static_cast(this)->status_change(loop, this->watch_pid, this->child_status); + auto rearm_type = static_cast(this)->status_change(loop, this->watch_pid, this->child_status); - loop.getBaseLock().lock(); + loop.get_base_lock().lock(); - if (rearmType != rearm::REMOVED) { + if (rearm_type != rearm::REMOVED) { this->active = false; if (this->deleteme) { // We don't want a watch that is marked "deleteme" to re-arm itself. - rearmType = rearm::REMOVE; + rearm_type = rearm::REMOVE; } - loop.process_child_watch_rearm(this, rearmType); + loop.process_child_watch_rearm(this, rearm_type); - // rearmType = loop.process??; - post_dispatch(loop, this, rearmType); + // rearm_type = loop.process??; + post_dispatch(loop, this, rearm_type); } } }; @@ -2049,67 +1940,79 @@ class timer : private base_timer_watcher { template friend class timer_impl; using base_t = base_timer_watcher; + using mutex_t = typename EventLoop::mutex_t; public: + using event_loop_t = EventLoop; - void add_timer(EventLoop &eloop, clock_type clock = clock_type::MONOTONIC, int prio = DEFAULT_PRIORITY) + void add_timer(event_loop_t &eloop, clock_type clock = clock_type::MONOTONIC, int prio = DEFAULT_PRIORITY) { base_watcher::init(); this->priority = prio; this->clock = clock; - eloop.registerTimer(this, clock); + this->intervals = 0; + eloop.register_timer(this, clock); } - void arm_timer(EventLoop &eloop, const timespec &timeout) noexcept + void arm_timer(event_loop_t &eloop, const timespec &timeout) noexcept { - eloop.setTimer(this, timeout, base_t::clock); + eloop.set_timer(this, timeout, base_t::clock); } - void arm_timer(EventLoop &eloop, const timespec &timeout, const timespec &interval) noexcept + void arm_timer(event_loop_t &eloop, const timespec &timeout, const timespec &interval) noexcept { - eloop.setTimer(this, timeout, interval, base_t::clock); + eloop.set_timer(this, timeout, interval, base_t::clock); } // Arm timer, relative to now: - void arm_timer_rel(EventLoop &eloop, const timespec &timeout) noexcept + void arm_timer_rel(event_loop_t &eloop, const timespec &timeout) noexcept { - eloop.setTimerRel(this, timeout, base_t::clock); + eloop.set_timer_rel(this, timeout, base_t::clock); } - void arm_timer_rel(EventLoop &eloop, const timespec &timeout, + void arm_timer_rel(event_loop_t &eloop, const timespec &timeout, const timespec &interval) noexcept { - eloop.setTimerRel(this, timeout, interval, base_t::clock); + eloop.set_timer_rel(this, timeout, interval, base_t::clock); } - void stop_timer(EventLoop &eloop) noexcept + void stop_timer(event_loop_t &eloop) noexcept { eloop.stop_timer(this, base_t::clock); } - void deregister(EventLoop &eloop) noexcept + void set_enabled(event_loop_t &eloop, clock_type clock, bool enabled) noexcept + { + std::lock_guard guard(eloop.get_base_lock()); + eloop.set_timer_enabled_nolock(this, clock, enabled); + if (! enabled) { + eloop.dequeue_watcher(this); + } + } + + void deregister(event_loop_t &eloop) noexcept { eloop.deregister(this, this->clock); } template static timer *add_timer(EventLoop &eloop, clock_type clock, bool relative, - struct timespec &timeout, struct timespec &interval, T watchHndlr) + struct timespec &timeout, struct timespec &interval, T watch_hndlr) { - class lambda_timer : public timer_impl + class lambda_timer : public timer_impl { private: - T watchHndlr; + T watch_hndlr; public: - lambda_timer(T watchHandlr_a) : watchHndlr(watchHandlr_a) + lambda_timer(T watch_handlr_a) : watch_hndlr(watch_handlr_a) { // } - rearm timer_expiry(EventLoop &eloop, int intervals) + rearm timer_expiry(event_loop_t &eloop, int intervals) { - return watchHndlr(eloop, intervals); + return watch_hndlr(eloop, intervals); } void watch_removed() noexcept override @@ -2118,7 +2021,7 @@ class timer : private base_timer_watcher } }; - lambda_timer * lt = new lambda_timer(watchHndlr); + lambda_timer * lt = new lambda_timer(watch_hndlr); lt->add_timer(eloop, clock); if (relative) { lt->arm_timer_rel(eloop, timeout, interval); @@ -2132,7 +2035,7 @@ class timer : private base_timer_watcher // Timer expired, and the given number of intervals have elapsed before // expiry event was queued. Normally intervals == 1 to indicate no // overrun. - // virtual rearm timer_expiry(EventLoop &eloop, int intervals) = 0; + // virtual rearm timer_expiry(event_loop_t &eloop, int intervals) = 0; }; template @@ -2141,23 +2044,25 @@ class timer_impl : public timer void dispatch(void *loop_ptr) noexcept override { EventLoop &loop = *static_cast(loop_ptr); - loop.getBaseLock().unlock(); + loop.get_base_lock().unlock(); - auto rearmType = static_cast(this)->timer_expiry(loop, this->intervals); + auto intervals_report = this->intervals; + this->intervals = 0; + auto rearm_type = static_cast(this)->timer_expiry(loop, intervals_report); - loop.getBaseLock().lock(); + loop.get_base_lock().lock(); - if (rearmType != rearm::REMOVED) { + if (rearm_type != rearm::REMOVED) { this->active = false; if (this->deleteme) { // We don't want a watch that is marked "deleteme" to re-arm itself. - rearmType = rearm::REMOVE; + rearm_type = rearm::REMOVE; } - loop.processTimerRearm(this, rearmType); + loop.process_timer_rearm(this, rearm_type); - post_dispatch(loop, this, rearmType); + post_dispatch(loop, this, rearm_type); } } }; diff --git a/src/service.cc b/src/service.cc index 5ef7d2d..838d7ee 100644 --- a/src/service.cc +++ b/src/service.cc @@ -1368,7 +1368,7 @@ void base_process_service::do_restart() noexcept bool base_process_service::restart_ps_process() noexcept { - using time_val = eventloop_t::time_val; + using time_val = dasynq::time_val; time_val current_time; eventLoop.get_time(current_time, clock_type::MONOTONIC); -- 2.25.1