sigemptyset(&chld_action.sa_mask);
chld_action.sa_flags = 0;
sigaction(SIGCHLD, &chld_action, nullptr);
- loop_mech->add_signal_watch(SIGCHLD, nullptr);
+
+ // Specify a dummy user data value - sigchld_handler
+ loop_mech->add_signal_watch(SIGCHLD, (void *) dprivate::sigchld_handler);
Base::init(loop_mech);
}
};
// Epoll doesn't return the file descriptor (it can, but it can't return both file
// descriptor and user data).
int fd;
+
+ public:
+ fd_s(int fd_p) noexcept : fd(fd_p) { }
};
// File descriptor reference (passed to event callback). If the mechanism can return the
// must be stored in an fd_s instance.
class fd_r {
public:
- int getFd(fd_s ss)
+ int get_fd(fd_s ss)
{
return ss.fd;
}
};
- const static bool has_bidi_fd_watch = true;
- const static bool has_separate_rw_fd_watches = false;
+ constexpr static bool has_bidi_fd_watch = true;
+ constexpr static bool has_separate_rw_fd_watches = false;
+ constexpr static bool interrupt_after_fd_add = false;
+ constexpr static bool interrupt_after_signal_add = false;
+ constexpr static bool supports_non_oneshot_fd = true;
};
(events[i].events & EPOLLHUP) && (flags |= IN_EVENTS);
(events[i].events & EPOLLOUT) && (flags |= OUT_EVENTS);
(events[i].events & EPOLLERR) && (flags |= IN_EVENTS | OUT_EVENTS | ERR_EVENTS);
- Base::receive_fd_event(*this, fd_r(), ptr, flags);
+ auto r = Base::receive_fd_event(*this, fd_r(), ptr, flags);
+ if (std::get<0>(r) != 0) {
+ enable_fd_watch_nolock(fd_r().get_fd(std::get<1>(r)), ptr, std::get<0>(r));
+ }
}
}
}
}
template <typename T>
- void receive_fd_event(T &loop_mech, typename Base::traits_t::fd_r fd_r_a, void * userdata, int flags)
+ std::tuple<int, typename Base::traits_t::fd_s>
+ receive_fd_event(T &loop_mech, typename Base::traits_t::fd_r fd_r_a, void * userdata, int flags)
{
if (userdata == &pipe_r_fd) {
// try to clear the pipe
char buf[64];
read(pipe_r_fd, buf, 64);
+ if (Base::traits_t::supports_non_oneshot_fd) {
+ // If the loop mechanism actually persists none-oneshot marked watches, we don't need
+ // to re-enable:
+ return std::make_tuple(0, typename Base::traits_t::fd_s(pipe_r_fd));
+ }
+ else {
+ return std::make_tuple(IN_EVENTS, typename Base::traits_t::fd_s(pipe_r_fd));
+ }
}
else {
- Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags);
+ return Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags);
}
}
#include <system_error>
-#include <mutex>
#include <type_traits>
#include <unordered_map>
#include <vector>
+#include <tuple>
#include <sys/event.h>
#include <sys/time.h>
// File descriptor optional storage. If the mechanism can return the file descriptor, this
// class will be empty, otherwise it can hold a file descriptor.
class fd_s {
+ public:
+ fd_s(int) { }
+
DASYNQ_EMPTY_BODY
};
}
};
- const static bool has_bidi_fd_watch = false;
- const static bool has_separate_rw_fd_watches = true;
+ constexpr static bool has_bidi_fd_watch = false;
+ constexpr static bool has_separate_rw_fd_watches = true;
+ constexpr static bool interrupt_after_fd_add = false;
+ constexpr static bool interrupt_after_signal_add = false;
+ constexpr static bool supports_non_oneshot_fd = false;
};
#if _POSIX_REALTIME_SIGNALS > 0
}
else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) {
int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS;
- Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags);
- events[i].flags = EV_DISABLE | EV_CLEAR;
- // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for
- // another connection).
+ auto r = Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags);
+ if (std::get<0>(r) == 0) {
+ // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for
+ // another connection).
+ events[i].flags = EV_DISABLE | EV_CLEAR;
+ }
+ else {
+ events[i].flags = EV_ENABLE;
+ }
}
else {
events[i].flags = EV_DISABLE;
// starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0.
// enable: specifies whether to enable reporting of timeouts/intervals
- void set_timer(timer_handle_t &timer_id, time_val &timeouttv, struct timespec &interval,
+ void set_timer(timer_handle_t &timer_id, const timespec &timeout, const timespec &interval,
bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
{
- timespec timeout = timeouttv;
-
std::lock_guard<decltype(Base::lock)> guard(Base::lock);
timer_queue_t &timer_queue = this->queue_for_clock(clock);
}
// Set timer relative to current time:
- void set_timer_rel(timer_handle_t &timer_id, const time_val &timeouttv, const time_val &intervaltv,
+ void set_timer_rel(timer_handle_t &timer_id, const timespec &timeout, const timespec &interval,
bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
{
- timespec timeout = timeouttv;
- timespec interval = intervaltv;
-
// TODO consider caching current time somehow; need to decide then when to update cached value.
struct timespec curtime;
int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME;
--- /dev/null
+#include <system_error>
+#include <vector>
+#include <atomic>
+
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <sys/stat.h>
+#include <sys/select.h>
+
+#include <unistd.h>
+#include <signal.h>
+#include <setjmp.h>
+
+#include "dasynq-config.h"
+
+// "pselect"-based event loop mechanism.
+//
+
+namespace dasynq {
+
+template <class Base> class select_events;
+
+class select_traits
+{
+ public:
+
+ class sigdata_t
+ {
+ template <class Base> friend class select_events;
+
+ siginfo_t info;
+
+ public:
+ // mandatory:
+ int get_signo() { return info.si_signo; }
+ int get_sicode() { return info.si_code; }
+ pid_t get_sipid() { return info.si_pid; }
+ uid_t get_siuid() { return info.si_uid; }
+ void * get_siaddr() { return info.si_addr; }
+ int get_sistatus() { return info.si_status; }
+ int get_sival_int() { return info.si_value.sival_int; }
+ void * get_sival_ptr() { return info.si_value.sival_ptr; }
+
+ // XSI
+ int get_sierrno() { return info.si_errno; }
+
+ // XSR (streams) OB (obselete)
+#if !defined(__OpenBSD__)
+ // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS
+ // interface.
+ int get_siband() { return info.si_band; }
+#endif
+
+ void set_signo(int signo) { info.si_signo = signo; }
+ };
+
+ class fd_r;
+
+ // File descriptor optional storage. If the mechanism can return the file descriptor, this
+ // class will be empty, otherwise it can hold a file descriptor.
+ class fd_s {
+ public:
+ fd_s(int fd) noexcept { }
+
+ DASYNQ_EMPTY_BODY
+ };
+
+ // File descriptor reference (passed to event callback). If the mechanism can return the
+ // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
+ // must be stored in an fd_s instance.
+ class fd_r {
+ int fd;
+ public:
+ int getFd(fd_s ss)
+ {
+ return fd;
+ }
+ fd_r(int nfd) : fd(nfd)
+ {
+ }
+ };
+
+ constexpr static bool has_bidi_fd_watch = false;
+ constexpr static bool has_separate_rw_fd_watches = true;
+ // requires interrupt after adding/enabling an fd:
+ constexpr static bool interrupt_after_fd_add = true;
+ constexpr static bool interrupt_after_signal_add = true;
+ constexpr static bool supports_non_oneshot_fd = false;
+};
+
+namespace dprivate {
+namespace select_mech {
+
+// We need to declare and define a non-static data variable, "siginfo_p", in this header, without
+// violating the "one definition rule". The only way to do that is via a template, even though we
+// don't otherwise need a template here:
+template <typename T = decltype(nullptr)> class sig_capture_templ
+{
+ public:
+ static siginfo_t siginfo_cap;
+ static sigjmp_buf rjmpbuf;
+
+ static void signal_handler(int signo, siginfo_t *siginfo, void *v)
+ {
+ siginfo_cap = *siginfo;
+ siglongjmp(rjmpbuf, 1);
+ }
+};
+template <typename T> siginfo_t sig_capture_templ<T>::siginfo_cap;
+template <typename T> sigjmp_buf sig_capture_templ<T>::rjmpbuf;
+
+using sig_capture = sig_capture_templ<>;
+
+inline void prepare_signal(int signo)
+{
+ struct sigaction the_action;
+ the_action.sa_sigaction = sig_capture::signal_handler;
+ the_action.sa_flags = SA_SIGINFO;
+ sigfillset(&the_action.sa_mask);
+
+ sigaction(signo, &the_action, nullptr);
+}
+
+inline sigjmp_buf &get_sigreceive_jmpbuf()
+{
+ return sig_capture::rjmpbuf;
+}
+
+inline void unprep_signal(int signo)
+{
+ signal(signo, SIG_DFL);
+}
+
+inline siginfo_t * get_siginfo()
+{
+ return &sig_capture::siginfo_cap;
+}
+
+} } // namespace dasynq :: select_mech
+
+template <class Base> class select_events : public Base
+{
+ fd_set read_set;
+ fd_set write_set;
+ //fd_set error_set; // logical OR of both the above
+ int max_fd = 0; // highest fd in any of the sets
+
+ sigset_t active_sigmask; // mask out unwatched signals i.e. active=0
+ void * sig_userdata[NSIG];
+
+ // userdata pointers in read and write respectively, for each fd:
+ std::vector<void *> rd_udata;
+ std::vector<void *> wr_udata;
+
+ // Base contains:
+ // lock - a lock that can be used to protect internal structure.
+ // receive*() methods will be called with lock held.
+ // receive_signal(sigdata_t &, user *) noexcept
+ // receive_fd_event(fd_r, user *, int flags) noexcept
+
+ using sigdata_t = select_traits::sigdata_t;
+ using fd_r = typename select_traits::fd_r;
+
+ void process_events(fd_set *read_set_p, fd_set *write_set_p, fd_set *error_set_p)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+ // Note: if error is set, report read-ready.
+
+ for (int i = 0; i <= max_fd; i++) {
+ if (FD_ISSET(i, read_set_p) || FD_ISSET(i, error_set_p)) {
+ if (FD_ISSET(i, &read_set) && rd_udata[i] != nullptr) {
+ // report read
+ auto r = Base::receive_fd_event(*this, fd_r(i), rd_udata[i], IN_EVENTS);
+ if (std::get<0>(r) == 0) {
+ FD_CLR(i, &read_set);
+ }
+ }
+ }
+ }
+
+ for (int i = 0; i <= max_fd; i++) {
+ if (FD_ISSET(i, write_set_p)) {
+ if (FD_ISSET(i, &write_set) && wr_udata[i] != nullptr) {
+ // report write
+ auto r = Base::receive_fd_event(*this, fd_r(i), wr_udata[i], OUT_EVENTS);
+ if (std::get<0>(r) == 0) {
+ FD_CLR(i, &write_set);
+ }
+ }
+ }
+ }
+ }
+
+ public:
+
+ /**
+ * select_events constructor.
+ *
+ * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
+ */
+ select_events()
+ {
+ FD_ZERO(&read_set);
+ FD_ZERO(&write_set);
+ sigfillset(&active_sigmask);
+ Base::init(this);
+ }
+
+ ~select_events()
+ {
+ }
+
+ // fd: file descriptor to watch
+ // userdata: data to associate with descriptor
+ // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT
+ // (only one of IN_EVENTS/OUT_EVENTS can be specified)
+ // soft_fail: true if unsupported file descriptors should fail by returning false instead
+ // of throwing an exception
+ // returns: true on success; false if file descriptor type isn't supported and emulate == true
+ // throws: std::system_error or std::bad_alloc on failure
+ bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false)
+ {
+ if (fd >= FD_SETSIZE) {
+ throw std::system_error(EMFILE, std::system_category());
+ }
+
+ if (flags & IN_EVENTS) {
+ FD_SET(fd, &read_set);
+ if (size_t(fd) >= rd_udata.size()) {
+ rd_udata.resize(fd + 1);
+ }
+ rd_udata[fd] = userdata;
+ }
+ else {
+ FD_SET(fd, &write_set);
+ if (size_t(fd) >= wr_udata.size()) {
+ wr_udata.resize(fd + 1);
+ }
+ wr_udata[fd] = userdata;
+ }
+
+ max_fd = std::max(fd, max_fd);
+
+ return true;
+ }
+
+ // returns: 0 on success
+ // IN_EVENTS if in watch requires emulation
+ // OUT_EVENTS if out watch requires emulation
+ int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false)
+ {
+ if (fd >= FD_SETSIZE) {
+ throw std::system_error(EMFILE, std::system_category());
+ }
+
+ if (flags & IN_EVENTS) {
+ FD_SET(fd, &read_set);
+ if (size_t(fd) >= rd_udata.size()) {
+ rd_udata.resize(fd + 1);
+ }
+ rd_udata[fd] = userdata;
+ }
+ if (flags & OUT_EVENTS) {
+ FD_SET(fd, &write_set);
+ if (size_t(fd) >= wr_udata.size()) {
+ wr_udata.resize(fd + 1);
+ }
+ wr_udata[fd] = userdata;
+ }
+
+ max_fd = std::max(fd, max_fd);
+
+ return 0;
+ }
+
+ // flags specifies which watch to remove; ignored if the loop doesn't support
+ // separate read/write watches.
+ void remove_fd_watch_nolock(int fd, int flags)
+ {
+ if (flags & IN_EVENTS) {
+ FD_CLR(fd, &read_set);
+ rd_udata[fd] = nullptr;
+ }
+ if (flags & OUT_EVENTS) {
+ FD_CLR(fd, &write_set);
+ wr_udata[fd] = nullptr;
+ }
+ }
+
+ void remove_fd_watch(int fd, int flags)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ remove_fd_watch_nolock(fd, flags);
+ }
+
+ void remove_bidi_fd_watch(int fd) noexcept
+ {
+ FD_CLR(fd, &read_set);
+ FD_CLR(fd, &write_set);
+ }
+
+ void enable_fd_watch_nolock(int fd, void *userdata, int flags)
+ {
+ if (flags & IN_EVENTS) {
+ FD_SET(fd, &read_set);
+ }
+ else {
+ FD_SET(fd, &write_set);
+ }
+ }
+
+ void enable_fd_watch(int fd, void *userdata, int flags)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ enable_fd_watch_nolock(fd, userdata, flags);
+ }
+
+ void disable_fd_watch_nolock(int fd, int flags)
+ {
+ if (flags & IN_EVENTS) {
+ FD_CLR(fd, &read_set);
+ }
+ else {
+ FD_CLR(fd, &write_set);
+ }
+ }
+
+ void disable_fd_watch(int fd, int flags)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ disable_fd_watch_nolock(fd, flags);
+ }
+
+ // Note signal should be masked before call.
+ void add_signal_watch(int signo, void *userdata)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ add_signal_watch_nolock(signo, userdata);
+ }
+
+ // Note signal should be masked before call.
+ void add_signal_watch_nolock(int signo, void *userdata)
+ {
+ sig_userdata[signo] = userdata;
+ sigdelset(&active_sigmask, signo);
+ dprivate::select_mech::prepare_signal(signo);
+ }
+
+ // Note, called with lock held:
+ void rearm_signal_watch_nolock(int signo, void *userdata) noexcept
+ {
+ sig_userdata[signo] = userdata;
+ sigdelset(&active_sigmask, signo);
+ }
+
+ void remove_signal_watch_nolock(int signo) noexcept
+ {
+ dprivate::select_mech::unprep_signal(signo);
+ sigaddset(&active_sigmask, signo);
+ sig_userdata[signo] = nullptr;
+ // No need to signal other threads
+ }
+
+ void remove_signal_watch(int signo) noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ remove_signal_watch_nolock(signo);
+ }
+
+ public:
+
+ // If events are pending, process an unspecified number of them.
+ // If no events are pending, wait until one event is received and
+ // process this event (and possibly any other events received
+ // simultaneously).
+ // If processing an event removes a watch, there is a possibility
+ // that the watched event will still be reported (if it has
+ // occurred) before pull_events() returns.
+ //
+ // do_wait - if false, returns immediately if no events are
+ // pending.
+ void pull_events(bool do_wait) noexcept
+ {
+ using namespace dprivate::select_mech;
+
+ struct timespec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+
+ fd_set read_set_c;
+ fd_set write_set_c;
+ fd_set err_set;
+
+ Base::lock.lock();
+ read_set_c = read_set;
+ write_set_c = write_set;
+ err_set = read_set;
+
+ sigset_t sigmask;
+ this->sigmaskf(SIG_UNBLOCK, nullptr, &sigmask);
+ // This is horrible, but hopefully will be optimised well. POSIX gives no way to combine signal
+ // sets other than this.
+ for (int i = 1; i < NSIG; i++) {
+ if (! sigismember(&active_sigmask, i)) {
+ sigdelset(&sigmask, i);
+ }
+ }
+ int nfds = max_fd + 1;
+ Base::lock.unlock();
+
+ volatile bool was_signalled = false;
+
+ // using sigjmp/longjmp is ugly, but there is no other way. If a signal that we're watching is
+ // received during polling, it will longjmp back to here:
+ if (sigsetjmp(get_sigreceive_jmpbuf(), 1) != 0) {
+ std::atomic_signal_fence(std::memory_order_acquire);
+ auto * sinfo = get_siginfo();
+ sigdata_t sigdata;
+ sigdata.info = *sinfo;
+ Base::lock.lock();
+ void *udata = sig_userdata[sinfo->si_signo];
+ if (udata != nullptr && Base::receive_signal(*this, sigdata, udata)) {
+ sigaddset(&sigmask, sinfo->si_signo);
+ sigaddset(&active_sigmask, sinfo->si_signo);
+ }
+ Base::lock.unlock();
+ was_signalled = true;
+ }
+
+ if (was_signalled) {
+ do_wait = false;
+ }
+
+ std::atomic_signal_fence(std::memory_order_release);
+
+ int r = pselect(nfds, &read_set_c, &write_set_c, &err_set, do_wait ? nullptr : &ts, &sigmask);
+ if (r == -1 || r == 0) {
+ // signal or no events
+ return;
+ }
+
+ process_events(&read_set_c, &write_set_c, &err_set);
+ }
+};
+
+} // end namespace
#include <utility>
#include <new>
-// Vector with possibility to shrink capacity arbitrarily
+// Vector with possibility to shrink capacity arbitrarily.
+//
+// The standard vector (std::vector) only allows shrinking a vector's capacity to its current size. In cases
+// where we need to keep some reserved capacity beyond the current size, we need an alternative solution: hence,
+// this class, svector.
namespace dasynq {
class svector
{
private:
- T * array;
+ union vec_node {
+ T elem;
+
+ vec_node() { }
+ ~vec_node() { }
+ };
+
+ vec_node * array;
size_t size_v;
size_t capacity_v;
if (size_v == capacity_v) {
// double capacity now:
if (capacity_v == 0) capacity_v = 1;
- T * new_array = new T[capacity_v * 2];
+ vec_node * new_array = new vec_node[capacity_v * 2];
for (size_t i = 0; i < size_v; i++) {
- new (&new_array[i]) T(std::move(array[i]));
- array[i].T::~T();
+ new (&new_array[i].elem) T(std::move(array[i].elem));
+ array[i].elem.T::~T();
}
delete[] array;
array = new_array;
size_v = other.size_v;
array = new T[capacity_v];
for (size_t i = 0; i < size_v; i++) {
- new (&array[i]) T(other[i]);
+ new (&array[i].elem) T(other[i].elem);
}
}
~svector()
{
for (size_t i = 0; i < size_v; i++) {
- array[i].T::~T();
+ array[i].elem.T::~T();
}
delete[] array;
}
void push_back(const T &t)
{
check_capacity();
- new (&array[size_v]) T(t);
+ new (&array[size_v].elem) T(t);
size_v++;
}
void push_back(T &&t)
{
check_capacity();
- new (&array[size_v]) T(t);
+ new (&array[size_v].elem) T(t);
size_v++;
}
void emplace_back(U... args)
{
check_capacity();
- new (&array[size_v]) T(args...);
+ new (&array[size_v].elem) T(args...);
size_v++;
}
T &operator[](size_t index)
{
- return array[index];
+ return array[index].elem;
}
const T &operator[](size_t index) const
{
- return array[index];
+ return array[index].elem;
}
size_t size() const
void reserve(size_t amount)
{
if (capacity_v < amount) {
- T * new_array = new T[amount];
+ vec_node * new_array = new vec_node[amount];
for (size_t i = 0; i < size_v; i++) {
- new (&new_array[i]) T(std::move(array[i]));
- array[i].T::~T();
+ new (&new_array[i].elem) T(std::move(array[i].elem));
+ array[i].elem.T::~T();
}
delete[] array;
array = new_array;
void shrink_to(size_t amount)
{
if (capacity_v > amount) {
- T * new_array = new(std::nothrow) T[amount];
+ vec_node * new_array = new(std::nothrow) vec_node[amount];
if (new_array == nullptr) {
return;
}
for (size_t i = 0; i < size_v; i++) {
- new (&new_array[i]) T(std::move(array[i]));
- array[i].T::~T();
+ new (&new_array[i].elem) T(std::move(array[i].elem));
+ array[i].elem.T::~T();
}
delete[] array;
array = new_array;
T &back()
{
- return array[size_v - 1];
+ return array[size_v - 1].elem;
}
T* begin()
{
- return array;
+ return reinterpret_cast<T *>(array);
}
const T *begin() const
{
- return array;
+ return reinterpret_cast<const T *>(array);
}
T* end()
{
- return array + size_v;
+ return reinterpret_cast<T *>(array + size_v);
}
const T *end() const
{
- return array + size_v;
+ return reinterpret_cast<const T *>(array + size_v);
}
};
};
template <typename T>
- void receive_fd_event(T &loop_mech, typename traits_t::fd_r fd_r_a, void * userdata, int flags)
+ std::tuple<int, typename traits_t::fd_s>
+ receive_fd_event(T &loop_mech, typename traits_t::fd_r fd_r_a, void * userdata, int flags)
{
if (userdata == &timerfd_fd) {
process_timer(clock_type::MONOTONIC, timerfd_fd);
+ return std::make_tuple(IN_EVENTS, typename traits_t::fd_s(timerfd_fd));
}
else if (userdata == &systemtime_fd) {
process_timer(clock_type::SYSTEM, systemtime_fd);
+ if (Base::traits_t::supports_non_oneshot_fd) {
+ return std::make_tuple(0, typename traits_t::fd_s(systemtime_fd));
+ }
+ return std::make_tuple(IN_EVENTS, typename traits_t::fd_s(systemtime_fd));
}
else {
- Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags);
+ return Base::receive_fd_event(loop_mech, fd_r_a, userdata, flags);
}
}
using loop_traits_t = epoll_traits;
}
#else
-#error No loop backened defined - see dasynq-config.h
+#include "dasynq-select.h"
+#if _POSIX_TIMERS > 0
+#include "dasynq-posixtimer.h"
+namespace dasynq {
+ template <typename T> using timer_events = posix_timer_events<T>;
+}
+#else
+#include "dasynq-itimer.h"
+namespace dasynq {
+ template <typename T> using timer_events = itimer_events<T>;
+}
+#endif
+#include "dasynq-childproc.h"
+namespace dasynq {
+ template <typename T> using loop_t = select_events<interrupt_channel<timer_events<child_proc_events<T>>>>;
+ using loop_traits_t = select_traits;
+}
#endif
#include <atomic>
return true;
}
+ // Receive fd event delivered from backend mechansim. Returns the desired watch mask, as per
+ // set_fd_enabled, which can be used to leave the watch disabled, re-enable it or re-enable
+ // one direction of a bi-directional watcher.
template <typename T>
- void receive_fd_event(T &loop_mech, typename Traits::fd_r fd_r, void * userdata, int flags) noexcept
+ std::tuple<int, typename Traits::fd_s> receive_fd_event(T &loop_mech, typename Traits::fd_r fd_r,
+ void * userdata, int flags) noexcept
{
base_fd_watcher * bfdw = static_cast<base_fd_watcher *>(userdata);
bfdw->event_flags |= flags;
+ typename Traits::fd_s watch_fd_s {bfdw->watch_fd};
base_watcher * bwatcher = bfdw;
queue_watcher(bwatcher);
- if (! traits_t::has_separate_rw_fd_watches) {
+ if (is_multi_watch && ! traits_t::has_separate_rw_fd_watches) {
// If this is a bidirectional fd-watch, it has been disabled in *both* directions
// as the event was delivered. However, the other direction should not be disabled
// yet, so we need to re-enable:
int in_out_mask = IN_EVENTS | OUT_EVENTS;
- if (is_multi_watch && (bfdw->watch_flags & in_out_mask) != 0) {
+ if ((bfdw->watch_flags & in_out_mask) != 0) {
// We need to re-enable the other channel now:
- loop_mech.enable_fd_watch_nolock(bfdw->watch_fd, userdata,
- (bfdw->watch_flags & in_out_mask) | ONE_SHOT);
+ return std::make_tuple((bfdw->watch_flags & in_out_mask) | ONE_SHOT, watch_fd_s);
+ // We are the polling thread: don't need to interrupt polling, even if it would
+ // normally be required.
}
}
+
+ return std::make_tuple(0, watch_fd_s);
}
// Child process terminated. Called with both the main lock and the reaper lock held.
return r;
}
- // Queue a watcher for reomval, or issue "removed" callback to it.
+ // Queue a watcher for removal, or issue "removed" callback to it.
// Call with lock free.
void issue_delete(base_watcher *watcher) noexcept
{
// - The mutex only protects manipulation of the wait queues, and so should not
// be highly contended.
- mutex_t wait_lock; // wait lock, used to prevent multiple threads from waiting
- // on the event queue simultaneously.
+ mutex_t wait_lock; // protects the wait/attention queues
waitqueue<mutex_t> attn_waitqueue;
waitqueue<mutex_t> wait_waitqueue;
loop_mech.prepare_watcher(callBack);
try {
loop_mech.add_signal_watch_nolock(signo, callBack);
+ if (backend_traits_t::interrupt_after_signal_add) {
+ interrupt_if_necessary();
+ }
}
catch (...) {
loop_mech.release_watcher(callBack);
}
}
}
+ else if (enabled && backend_traits_t::interrupt_after_fd_add) {
+ interrupt_if_necessary();
+ }
}
catch (...) {
loop_mech.release_watcher(callback);
try {
loop_mech.prepare_watcher(&callback->out_watcher);
try {
+ bool do_interrupt = false;
if (backend_traits_t::has_separate_rw_fd_watches) {
int r = loop_mech.add_bidi_fd_watch(fd, callback, eventmask | ONE_SHOT, emulate);
if (r & IN_EVENTS) {
requeue_watcher(callback);
}
}
+ else if ((eventmask & IN_EVENTS) && backend_traits_t::interrupt_after_fd_add) {
+ do_interrupt = true;
+ }
+
if (r & OUT_EVENTS) {
callback->out_watcher.emulatefd = true;
if (eventmask & OUT_EVENTS) {
requeue_watcher(&callback->out_watcher);
}
}
+ else if ((eventmask & OUT_EVENTS) && backend_traits_t::interrupt_after_fd_add) {
+ do_interrupt = true;
+ }
}
else {
if (! loop_mech.add_fd_watch(fd, callback, eventmask | ONE_SHOT, true, emulate)) {
requeue_watcher(&callback->out_watcher);
}
}
+ else if (backend_traits_t::interrupt_after_fd_add) {
+ do_interrupt = true;
+ }
+ }
+
+ if (do_interrupt) {
+ interrupt_if_necessary();
}
}
catch (...) {
{
if (enabled) {
loop_mech.enable_fd_watch(fd, watcher, watch_flags | ONE_SHOT);
+ if (backend_traits_t::interrupt_after_fd_add) {
+ interrupt_if_necessary();
+ }
}
else {
loop_mech.disable_fd_watch(fd, watch_flags);
{
if (enabled) {
loop_mech.enable_fd_watch_nolock(fd, watcher, watch_flags | ONE_SHOT);
+ if (backend_traits_t::interrupt_after_fd_add) {
+ interrupt_if_necessary();
+ }
}
else {
loop_mech.disable_fd_watch_nolock(fd, watch_flags);
}
}
+ // Interrupt the current poll-waiter, if necessary - that is, if the loop is multi-thread safe, and if
+ // there is currently another thread polling the backend event mechanism.
+ void interrupt_if_necessary()
+ {
+ std::lock_guard<mutex_t> guard(wait_lock);
+ if (! attn_waitqueue.is_empty()) { // (always false for single-threaded loops)
+ loop_mech.interrupt_wait();
+ }
+ }
+
// Acquire the attention lock (when held, ensures that no thread is polling the AEN
// mechanism). This can be used to safely remove watches, since it is certain that
// notification callbacks won't be run while the attention lock is held.
// Called with lock held
if (rearm_type == rearm::REARM) {
loop_mech.rearm_signal_watch_nolock(bsw->siginfo.get_signo(), bsw);
+ if (backend_traits_t::interrupt_after_signal_add) {
+ interrupt_if_necessary();
+ }
}
else if (rearm_type == rearm::REMOVE) {
loop_mech.remove_signal_watch_nolock(bsw->siginfo.get_signo());
if (bdfw->watch_flags & IN_EVENTS) {
bdfw->watch_flags &= ~IN_EVENTS;
if (! emulatedfd) {
- loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags);
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags,
+ bdfw->watch_flags != 0);
}
}
return rearm::NOOP;
if (! emulatedfd) {
if (! backend_traits_t::has_separate_rw_fd_watches) {
- int watch_flags = bdfw->watch_flags;
- // without separate r/w watches, enable_fd_watch actually sets
- // which sides are enabled (i.e. can be used to disable):
- loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
- static_cast<base_watcher *>(bdfw),
- (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ int watch_flags = bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS);
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags, watch_flags != 0);
}
else {
loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
if (! emulatedfd) {
if (! backend_traits_t::has_separate_rw_fd_watches) {
int watch_flags = bdfw->watch_flags;
- loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
- static_cast<base_watcher *>(bdfw),
- (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd,
+ watch_flags & (IN_EVENTS | OUT_EVENTS), true);
}
else {
- loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
- static_cast<base_watcher *>(bdfw),
- IN_EVENTS | ONE_SHOT);
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd, IN_EVENTS, true);
}
}
else {
}
}
else if (rearm_type == rearm::REARM) {
- loop_mech.enable_fd_watch_nolock(bfw->watch_fd, bfw,
- (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ set_fd_enabled_nolock(bfw, bfw->watch_fd,
+ bfw->watch_flags & (IN_EVENTS | OUT_EVENTS), true);
}
else if (rearm_type == rearm::DISARM) {
loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
if (! bdfw->read_removed) {
if (bdfw->watch_flags & OUT_EVENTS) {
bdfw->watch_flags &= ~OUT_EVENTS;
- loop_mech.enable_fd_watch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags);
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags, true);
}
return rearm::NOOP;
}
if (! backend_traits_t::has_separate_rw_fd_watches) {
int watch_flags = bdfw->watch_flags;
- loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
- static_cast<base_watcher *>(bdfw),
- (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags & (IN_EVENTS | OUT_EVENTS), true);
}
else {
loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, OUT_EVENTS);
if (! backend_traits_t::has_separate_rw_fd_watches) {
int watch_flags = bdfw->watch_flags;
- loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
- static_cast<base_watcher *>(bdfw),
- (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags & (IN_EVENTS | OUT_EVENTS), true);
}
else {
- loop_mech.enable_fd_watch_nolock(bdfw->watch_fd,
- static_cast<base_watcher *>(bdfw),
- OUT_EVENTS | ONE_SHOT);
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd, OUT_EVENTS | ONE_SHOT, true);
}
}
return rearm_type;