#include "dasynq-config.h"
+// "kqueue"-based event loop mechanism.
+//
+// kqueue is available on BSDs and Mac OS X, though there are subtle differences from OS to OS.
+//
+// kqueue supports watching file descriptors (input and output as separate watches only),
+// signals, child processes, and timers. Unfortunately support for the latter two is imperfect;
+// it is not possible to reserve process watches in advance; timers can only be active, count
+// down immediately when created, and cannot be reset to another time. For timers especially
+// the problems are significant: we can't allocate timers in advance, and we can't even feasibly
+// manage our own timer queue via a single kqueue-backed timer. Therefore, an alternate timer
+// mechanism must be used together with kqueue.
+
namespace dasynq {
template <class Base> class KqueueLoop;
const static bool supports_childwatch_reservation = true;
};
-#if defined(__OpenBSD__)
+#if defined(__OpenBSD__) && _POSIX_REALTIME_SIGNALS <= 0
// OpenBSD has no sigtimedwait (or sigwaitinfo) but does have "__thrsigdivert", which is
// essentially an incomplete version of the same thing. Discussion with OpenBSD developer
-// Ted Unangst suggested that the siginfo_t structure returned might not always have all
-// fields set correctly. Furthermore there is a bug such that specifying a zero timeout (or
-// indeed any timeout less than a tick) results in NO timeout. We get around this by instead
-// specifying an *invalid* timeout, which won't error out if a signal is pending.
+// Ted Unangst suggested that the siginfo_t structure returned might not always have all fields
+// set correctly. Furthermore there is a bug (at least in 5.9) such that specifying a zero
+// timeout (or indeed any timeout less than a tick) results in NO timeout. We get around this by
+// instead specifying an *invalid* timeout, which won't error out if a signal is pending.
static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct timespec *timeout)
{
// We know that we're only called with a timeout of 0 (which doesn't work properly) and
timeout->tv_nsec = 1000000001;
return __thrsigdivert(*ssp, info, timeout);
}
+#endif
+#if defined(__OpenBSD__) || _POSIX_REALTIME_SIGNALS > 0
static inline void prepare_signal(int signo) { }
static inline void unprep_signal(int signo) { }
sigaddset(&mask, signo);
return (sigtimedwait(&mask, siginfo, &timeout) != -1);
}
+#else
-#elif defined(__APPLE__)
+// If we have no sigtimedwait implementation, we have to retrieve signal data by establishing a
+// signal handler:
static siginfo_t * siginfo_p;
if (events[i].filter == EVFILT_SIGNAL) {
SigInfo siginfo;
if (get_siginfo(events[i].ident, &siginfo.info)
- && Base::receiveSignal(*this, siginfo, (void *)events[i].udata)) {
+ && Base::receive_signal(*this, siginfo, (void *)events[i].udata)) {
sigdelset(&sigmask, events[i].ident);
events[i].flags = EV_DISABLE;
}
kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
}
- // flags: IN_EVENTS | OUT_EVENTS
- void addFdWatch(int fd, void *userdata, int flags, bool enabled = true)
+ // fd: file descriptor to watch
+ // userdata: data to associate with descriptor
+ // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT
+ // (only one of IN_EVENTS/OUT_EVENTS can be specified)
+ // soft_fail: true if unsupported file descriptors should fail by returning false instead
+ // of throwing an exception
+ // returns: true on success; false if file descriptor type isn't supported and soft_fail == true
+ // throws: std::system_error or std::bad_alloc on failure
+ bool addFdWatch(int fd, void *userdata, int flags, bool enabled = true, bool emulate = false)
{
- // TODO kqueue doesn't support EVFILT_WRITE on file fd's :/
- // Presumably they cause the kevent call to fail. We could maintain
- // a separate set and use poll() (urgh).
-
short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE;
-
+
struct kevent kev;
EV_SET(&kev, fd, filter, EV_ADD | (enabled ? 0 : EV_DISABLE), 0, 0, userdata);
if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) {
+ // Note that kqueue supports EVFILT_READ on regular file fd's, but not EVFIL_WRITE.
+ if (filter == EVFILT_WRITE && errno == EINVAL) {
+ return false; // emulate
+ }
throw new std::system_error(errno, std::system_category());
}
+ return true;
}
-
- void addBidiFdWatch(int fd, void *userdata, int flags)
+
+ // returns: 0 on success
+ // IN_EVENTS if in watch requires emulation
+ // OUT_EVENTS if out watch requires emulation
+ int addBidiFdWatch(int fd, void *userdata, int flags, bool emulate = false)
{
+#ifdef EV_RECEIPT
struct kevent kev[2];
+ struct kevent kev_r[2];
+ short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
+ short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
+ EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata);
+ EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
+
+ int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr);
+
+ if (r == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+
+ // Some possibilities:
+ // - both ends failed. We'll throw an error rather than allowing emulation.
+ // - read watch failed, write succeeded : should not happen.
+ // - read watch added, write failed: if emulate == true, succeed;
+ // if emulate == false, remove read and fail.
+
+ if (kev_r[0].data != 0) {
+ // read failed
+ throw new std::system_error(kev_r[0].data, std::system_category());
+ }
+
+ if (kev_r[1].data != 0) {
+ if (emulate) {
+ return OUT_EVENTS;
+ }
+ // remove read watch
+ EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+ kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+ // throw exception
+ throw new std::system_error(kev_r[1].data, std::system_category());
+ }
+
+ return 0;
+#else
+ // OpenBSD doesn't have EV_RECEIPT: install the watches one at a time
+ struct kevent kev[1];
+
short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE);
short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE);
EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata);
- EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
-
- if (kevent(kqfd, kev, 2, nullptr, 0, nullptr) == -1) {
+
+ int r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+
+ if (r == -1) {
throw new std::system_error(errno, std::system_category());
- }
+ }
+
+ EV_SET(&kev[0], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
+
+ r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+
+ if (r == -1) {
+ if (emulate) {
+ return OUT_EVENTS;
+ }
+ // remove read watch
+ EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+ kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+ // throw exception
+ throw new std::system_error(errno, std::system_category());
+ }
+
+ return 0;
+#endif
}
// flags specifies which watch to remove; ignored if the loop doesn't support
struct kevent evt;
EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD, 0, 0, userdata);
- // TODO use EV_DISPATCH if available (not on OpenBSD)
+ // TODO use EV_DISPATCH if available (not on OpenBSD/OS X)
if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) {
throw new std::system_error(errno, std::system_category());
removeSignalWatch_nolock(signo);
}
- // If events are pending, process an unspecified number of them.
- // If no events are pending, wait until one event is received and
- // process this event (and possibly any other events received
- // simultaneously).
- // If processing an event removes a watch, there is a possibility
- // that the watched event will still be reported (if it has
- // occurred) before pullEvents() returns.
+ private:
+
+ // We actually need to check pending signals before polling the kqueue, since kqueue can
+ // count signals as they are delivered but the count is cleared when we poll the kqueue,
+ // meaning that signals might still be pending if they were queued multiple times at the
+ // last poll (since we report only one signal delivery at a time and the watch is
+ // automatically disabled each time).
//
- // do_wait - if false, returns immediately if no events are
- // pending.
- void pullEvents(bool do_wait)
+ // The check is not necessary on systems that don't queue signals.
+void pull_signals()
{
- // We actually need to check pending signals, since
- // kqueue can count signals as they are delivered but the count is
- // cleared when we poll the kqueue, meaning that signals might still
- // be pending if they were queued multiple times at the last poll.
-
+#if _POSIX_REALTIME_SIGNALS > 0
// TODO we should only poll for signals that *have* been reported
// as being raised more than once prior via kevent, rather than all
- // signals that have been registered - in many cases that will allow
+ // signals that have been registered - in many cases that may allow
// us to skip the sigtimedwait call altogether.
-
- // The check is not necessary on systems that don't queue signals.
-
-#if _POSIX_REALTIME_SIGNALS > 0
{
std::lock_guard<decltype(Base::lock)> guard(Base::lock);
sigdelset(&sigmask, rsigno);
// TODO accumulate and disable multiple filters with a single kevents call
// rather than disabling each individually
- setFilterEnabled(EVFILT_SIGNAL, rsigno, false);
+ setFilterEnabled(EVFILT_SIGNAL, rsigno, sigdataMap[rsigno], false);
}
rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout);
}
}
#endif
-
- struct kevent events[16];
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
- int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
- if (r == -1 || r == 0) {
- // signal or no events
- return;
- }
-
- processEvents(events, r);
}
- // If events are pending, process one of them.
+ public:
+
+ // If events are pending, process an unspecified number of them.
// If no events are pending, wait until one event is received and
- // process this event.
+ // process this event (and possibly any other events received
+ // simultaneously).
+ // If processing an event removes a watch, there is a possibility
+ // that the watched event will still be reported (if it has
+ // occurred) before pullEvents() returns.
//
// do_wait - if false, returns immediately if no events are
- // pending.
- void pullOneEvent(bool do_wait)
+ // pending.
+ void pullEvents(bool do_wait)
{
- // TODO must check for pending signals as per pullEvents()
- struct kevent events[1];
+ pull_signals();
+
+ struct kevent events[16];
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 0;
- int r = kevent(kqfd, nullptr, 0, events, 1, do_wait ? nullptr : &ts);
+ int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
if (r == -1 || r == 0) {
// signal or no events
return;
}
-
- processEvents(events, r);
+
+ do {
+ processEvents(events, r);
+ r = kevent(kqfd, nullptr, 0, events, 16, &ts);
+ } while (r > 0);
}
};
--- /dev/null
+#include <vector>
+#include <utility>
+
+#include <sys/time.h>
+#include <time.h>
+#include <signal.h>
+
+#include "dasynq-config.h"
+#include "dasynq-timerbase.h"
+
+namespace dasynq {
+
+// Timer implementation based on POSIX create_timer et al.
+// May require linking with -lrt
+
+template <class Base> class PosixTimerEvents : public timer_base<Base>
+{
+ private:
+ timer_queue_t real_timer_queue;
+ timer_queue_t mono_timer_queue;
+
+ timer_t real_timer;
+ timer_t mono_timer;
+
+ // Set the timeout to match the first timer in the queue (disable the timer if there are no
+ // active timers).
+ void set_timer_from_queue(timer_t &timer, timer_queue_t &timer_queue)
+ {
+ struct itimerspec newalarm;
+
+ if (timer_queue.empty()) {
+ newalarm.it_value = {0, 0};
+ newalarm.it_interval = {0, 0};
+ timer_settime(timer, TIMER_ABSTIME, &newalarm, nullptr);
+ return;
+ }
+
+ newalarm.it_interval = {0, 0};
+ newalarm.it_value = timer_queue.get_root_priority();
+ timer_settime(timer, TIMER_ABSTIME, &newalarm, nullptr);
+ }
+
+ protected:
+
+ using SigInfo = typename Base::SigInfo;
+
+ template <typename T>
+ bool receive_signal(T & loop_mech, SigInfo &siginfo, void *userdata)
+ {
+ if (siginfo.get_signo() == SIGALRM) {
+ struct timespec curtime;
+
+ if (! real_timer_queue.empty()) {
+ clock_gettime(CLOCK_REALTIME, &curtime);
+ timer_base<Base>::process_timer_queue(real_timer_queue, curtime);
+ set_timer_from_queue(real_timer, real_timer_queue);
+ }
+
+ if (! mono_timer_queue.empty()) {
+ clock_gettime(CLOCK_MONOTONIC, &curtime);
+ timer_base<Base>::process_timer_queue(mono_timer_queue, curtime);
+ set_timer_from_queue(mono_timer, mono_timer_queue);
+ }
+
+ // loop_mech.rearmSignalWatch_nolock(SIGALRM);
+ return false; // don't disable signal watch
+ }
+ else {
+ return Base::receive_signal(loop_mech, siginfo, userdata);
+ }
+ }
+
+ timer_queue_t &queue_for_clock(clock_type clock)
+ {
+ switch (clock) {
+ case clock_type::MONOTONIC:
+ return mono_timer_queue;
+ case clock_type::SYSTEM:
+ return real_timer_queue;
+ default:
+ DASYNQ_UNREACHABLE;
+ }
+ }
+
+ timer_t &timer_for_clock(clock_type clock)
+ {
+ switch (clock) {
+ case clock_type::MONOTONIC:
+ return mono_timer;
+ case clock_type::SYSTEM:
+ return real_timer;
+ default:
+ DASYNQ_UNREACHABLE;
+ }
+ }
+
+ public:
+
+ template <typename T> void init(T *loop_mech)
+ {
+ sigset_t sigmask;
+ sigprocmask(SIG_UNBLOCK, nullptr, &sigmask);
+ sigaddset(&sigmask, SIGALRM);
+ sigprocmask(SIG_SETMASK, &sigmask, nullptr);
+ loop_mech->addSignalWatch(SIGALRM, nullptr);
+
+ struct sigevent timer_sigevent;
+ timer_sigevent.sigev_notify = SIGEV_SIGNAL;
+ timer_sigevent.sigev_signo = SIGALRM;
+ timer_sigevent.sigev_value.sival_int = 0;
+
+ // Create the timers; throw std::system_error if we can't.
+ if (timer_create(CLOCK_REALTIME, &timer_sigevent, &real_timer) == 0) {
+ if (timer_create(CLOCK_MONOTONIC, &timer_sigevent, &mono_timer) != 0) {
+ timer_delete(real_timer);
+ throw std::system_error(errno, std::system_category());
+ }
+ }
+ else {
+ throw std::system_error(errno, std::system_category());
+ }
+
+ Base::init(loop_mech);
+ }
+
+ void addTimer(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ queue_for_clock(clock).allocate(h, userdata);
+ }
+
+ void removeTimer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ removeTimer_nolock(timer_id, clock);
+ }
+
+ void removeTimer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ timer_queue_t &timer_queue = queue_for_clock(clock);
+
+ if (timer_queue.is_queued(timer_id)) {
+ timer_queue.remove(timer_id);
+ }
+ timer_queue.deallocate(timer_id);
+ }
+
+ // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0.
+ // enable: specifies whether to enable reporting of timeouts/intervals
+ void setTimer(timer_handle_t &timer_id, struct timespec &timeout, struct timespec &interval,
+ bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+ timer_queue_t &timer_queue = queue_for_clock(clock);
+ timer_t &timer = timer_for_clock(clock);
+
+ auto &ts = timer_queue.node_data(timer_id);
+ ts.interval_time = interval;
+ ts.expiry_count = 0;
+ ts.enabled = enable;
+
+ if (timer_queue.is_queued(timer_id)) {
+ // Already queued; alter timeout
+ if (timer_queue.set_priority(timer_id, timeout)) {
+ set_timer_from_queue(timer, timer_queue);
+ }
+ }
+ else {
+ if (timer_queue.insert(timer_id, timeout)) {
+ set_timer_from_queue(timer, timer_queue);
+ }
+ }
+ }
+
+ // Set timer relative to current time:
+ void setTimerRel(timer_handle_t &timer_id, struct timespec &timeout, struct timespec &interval,
+ bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ // TODO consider caching current time somehow; need to decide then when to update cached value.
+ struct timespec curtime;
+ int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME;
+ clock_gettime(posix_clock_id, &curtime);
+ curtime.tv_sec += timeout.tv_sec;
+ curtime.tv_nsec += timeout.tv_nsec;
+ if (curtime.tv_nsec > 1000000000) {
+ curtime.tv_nsec -= 1000000000;
+ curtime.tv_sec++;
+ }
+ setTimer(timer_id, curtime, interval, enable, clock);
+ }
+
+ // Enables or disabling report of timeouts (does not stop timer)
+ void enableTimer(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ enableTimer_nolock(timer_id, enable, clock);
+ }
+
+ void enableTimer_nolock(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ timer_queue_t &timer_queue = queue_for_clock(clock);
+
+ auto &node_data = timer_queue.node_data(timer_id);
+ auto expiry_count = node_data.expiry_count;
+ if (expiry_count != 0) {
+ node_data.expiry_count = 0;
+ Base::receiveTimerExpiry(timer_id, node_data.userdata, expiry_count);
+ }
+ else {
+ timer_queue.node_data(timer_id).enabled = enable;
+ }
+ }
+
+ void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ stop_timer_nolock(timer_id, clock);
+ }
+
+ void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ timer_queue_t &timer_queue = queue_for_clock(clock);
+ timer_t &timer = timer_for_clock(clock);
+
+ if (timer_queue.is_queued(timer_id)) {
+ bool was_first = (&timer_queue.get_root()) == &timer_id;
+ timer_queue.remove(timer_id);
+ if (was_first) {
+ set_timer_from_queue(timer, timer_queue);
+ }
+ }
+ }
+
+ void get_time(timespec &ts, clock_type clock, bool force_update) noexcept
+ {
+ int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME;
+ clock_gettime(posix_clock_id, &ts);
+ }
+
+ ~PosixTimerEvents()
+ {
+ timer_delete(mono_timer);
+ timer_delete(real_timer);
+ }
+};
+
+}
// Loop and LoopTraits defined already; used for testing
#elif defined(DASYNQ_HAVE_KQUEUE)
#include "dasynq-kqueue.h"
+#if _POSIX_TIMERS > 0
+#include "dasynq-posixtimer.h"
+namespace dasynq {
+ template <typename T> using TimerEvents = PosixTimerEvents<T>;
+}
+#else
#include "dasynq-itimer.h"
+namespace dasynq {
+ template <typename T> using TimerEvents = ITimerEvents<T>;
+}
+#endif
#include "dasynq-childproc.h"
namespace dasynq {
- template <typename T> using Loop = KqueueLoop<interrupt_channel<ITimerEvents<ChildProcEvents<T>>>>;
+ template <typename T> using Loop = KqueueLoop<interrupt_channel<TimerEvents<ChildProcEvents<T>>>>;
using LoopTraits = KqueueTraits;
}
#elif defined(DASYNQ_HAVE_EPOLL)
namespace dasynq {
-#ifdef __APPLE__
+#if HAVE_PIPE2 == 0
inline int pipe2(int filedes[2], int flags)
{
if (pipe(filedes) == -1) {
#endif
namespace dprivate {
- class BaseWatcher;
+ class base_watcher;
}
-using PrioQueue = NaryHeap<dprivate::BaseWatcher *, int>;
+using PrioQueue = NaryHeap<dprivate::base_watcher *, int>;
inline namespace {
constexpr int DEFAULT_PRIORITY = 50;
template <typename, typename> class child_proc_watcher_impl;
template <typename, typename> class timer_impl;
- enum class WatchType
+ enum class watch_type_t
{
SIGNAL,
FD,
constexpr static int multi_watch = 4;
// Represents a queued event notification. Various event watchers derive from this type.
- class BaseWatcher
+ class base_watcher
{
template <typename T_Mutex, typename Traits> friend class EventDispatch;
template <typename T_Mutex, template <typename> class, typename> friend class dasynq::event_loop;
- friend inline void basewatcher_set_active(BaseWatcher &watcher, bool active);
- friend inline bool basewatcher_get_deleteme(const BaseWatcher &watcher);
+ friend inline void basewatcher_set_active(base_watcher &watcher, bool active);
+ friend inline bool basewatcher_get_deleteme(const base_watcher &watcher);
+ friend inline bool basewatcher_get_emulatefd(const base_watcher &watcher);
protected:
- WatchType watchType;
+ watch_type_t watchType;
int active : 1; // currently executing handler?
int deleteme : 1; // delete when handler finished?
+ int emulatefd : 1; // emulate file watch (by re-queueing)
+ int emulate_enabled : 1; // whether an emulated watch is enabled
PrioQueue::handle_t heap_handle;
int priority;
- static void set_priority(BaseWatcher &p, int prio)
+ static void set_priority(base_watcher &p, int prio)
{
p.priority = prio;
}
{
active = false;
deleteme = false;
+ emulatefd = false;
+ emulate_enabled = false;
PrioQueue::init_handle(heap_handle);
priority = DEFAULT_PRIORITY;
}
- BaseWatcher(WatchType wt) noexcept : watchType(wt) { }
+ base_watcher(watch_type_t wt) noexcept : watchType(wt) { }
virtual void dispatch(void *loop_ptr) noexcept { };
virtual void dispatch_second(void *loop_ptr) noexcept { }
- virtual ~BaseWatcher() noexcept { }
+ virtual ~base_watcher() noexcept { }
// Called when the watcher has been removed.
// It is guaranteed by the caller that:
}
};
- inline void basewatcher_set_active(BaseWatcher &watcher, bool active)
+ inline void basewatcher_set_active(base_watcher &watcher, bool active)
{
watcher.active = active;
}
- inline bool basewatcher_get_deleteme(const BaseWatcher &watcher)
+ inline bool basewatcher_get_deleteme(const base_watcher &watcher)
{
return watcher.deleteme;
}
+ inline bool basewatcher_get_emulatefd(const base_watcher &watcher)
+ {
+ return watcher.emulatefd;
+ }
+
// Base signal event - not part of public API
template <typename T_Mutex, typename Traits>
- class BaseSignalWatcher : public BaseWatcher
+ class base_signal_watcher : public base_watcher
{
friend class EventDispatch<T_Mutex, Traits>;
template <typename, template <typename> class, typename> friend class dasynq::event_loop;
protected:
typename Traits::SigInfo siginfo;
- BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { }
+ base_signal_watcher() : base_watcher(watch_type_t::SIGNAL) { }
public:
using siginfo_t = typename Traits::SigInfo;
};
template <typename T_Mutex>
- class BaseFdWatcher : public BaseWatcher
+ class base_fd_watcher : public base_watcher
{
template <typename, typename Traits> friend class EventDispatch;
template <typename, template <typename> class, typename> friend class dasynq::event_loop;
// the events that the watcher is currently watching (i.e. specifies which
// halves of the Bidi watcher are enabled).
- BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }
+ base_fd_watcher() noexcept : base_watcher(watch_type_t::FD) { }
};
template <typename T_Mutex>
- class BaseBidiFdWatcher : public BaseFdWatcher<T_Mutex>
+ class base_bidi_fd_watcher : public base_fd_watcher<T_Mutex>
{
template <typename, typename Traits> friend class EventDispatch;
template <typename, template <typename> class, typename> friend class dasynq::event_loop;
// The main instance is the "input" watcher only; we keep a secondary watcher
// with a secondary set of flags for the "output" watcher:
- BaseWatcher outWatcher {WatchType::SECONDARYFD};
+ base_watcher outWatcher {watch_type_t::SECONDARYFD};
int read_removed : 1; // read watch removed?
int write_removed : 1; // write watch removed?
};
template <typename T_Mutex>
- class BaseChildWatcher : public BaseWatcher
+ class base_child_watcher : public base_watcher
{
template <typename, typename Traits> friend class EventDispatch;
template <typename, template <typename> class, typename> friend class dasynq::event_loop;
pid_t watch_pid;
int child_status;
- BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { }
+ base_child_watcher() : base_watcher(watch_type_t::CHILD) { }
};
template <typename T_Mutex>
- class BaseTimerWatcher : public BaseWatcher
+ class base_timer_watcher : public base_watcher
{
template <typename, typename Traits> friend class EventDispatch;
template <typename, template <typename> class, typename> friend class dasynq::event_loop;
int intervals;
clock_type clock;
- BaseTimerWatcher() : BaseWatcher(WatchType::TIMER)
+ base_timer_watcher() : base_watcher(watch_type_t::TIMER)
{
init_timer_handle(timer_handle);
}
// Do standard post-dispatch processing for a watcher. This handles the case of removing or
// re-queing watchers depending on the rearm type.
- template <typename Loop> void post_dispatch(Loop &loop, BaseWatcher *watcher, rearm rearmType)
+ template <typename Loop> void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearmType)
{
if (rearmType == rearm::REMOVE) {
loop.getBaseLock().unlock();
loop.getBaseLock().lock();
}
else if (rearmType == rearm::REQUEUE) {
- loop.requeueWatcher(watcher);
+ loop.requeue_watcher(watcher);
}
}
// queue data structure/pointer
PrioQueue event_queue;
- using BaseSignalWatcher = dasynq::dprivate::BaseSignalWatcher<T_Mutex,Traits>;
- using BaseFdWatcher = dasynq::dprivate::BaseFdWatcher<T_Mutex>;
- using BaseBidiFdWatcher = dasynq::dprivate::BaseBidiFdWatcher<T_Mutex>;
- using BaseChildWatcher = dasynq::dprivate::BaseChildWatcher<T_Mutex>;
- using BaseTimerWatcher = dasynq::dprivate::BaseTimerWatcher<T_Mutex>;
+ using BaseSignalWatcher = dasynq::dprivate::base_signal_watcher<T_Mutex,Traits>;
+ using BaseFdWatcher = dasynq::dprivate::base_fd_watcher<T_Mutex>;
+ using BaseBidiFdWatcher = dasynq::dprivate::base_bidi_fd_watcher<T_Mutex>;
+ using BaseChildWatcher = dasynq::dprivate::base_child_watcher<T_Mutex>;
+ using BaseTimerWatcher = dasynq::dprivate::base_timer_watcher<T_Mutex>;
// Add a watcher into the queuing system (but don't queue it)
// may throw: std::bad_alloc
- void prepare_watcher(BaseWatcher *bwatcher)
+ void prepare_watcher(base_watcher *bwatcher)
{
event_queue.allocate(bwatcher->heap_handle, bwatcher);
}
- void queueWatcher(BaseWatcher *bwatcher) noexcept
+ void queueWatcher(base_watcher *bwatcher) noexcept
{
event_queue.insert(bwatcher->heap_handle, bwatcher->priority);
}
- bool isQueued(BaseWatcher *bwatcher) noexcept
+ bool isQueued(base_watcher *bwatcher) noexcept
{
return event_queue.is_queued(bwatcher->heap_handle);
}
- void dequeueWatcher(BaseWatcher *bwatcher) noexcept
+ void dequeueWatcher(base_watcher *bwatcher) noexcept
{
if (event_queue.is_queued(bwatcher->heap_handle)) {
event_queue.remove(bwatcher->heap_handle);
}
// Remove watcher from the queueing system
- void release_watcher(BaseWatcher *bwatcher) noexcept
+ void release_watcher(base_watcher *bwatcher) noexcept
{
event_queue.deallocate(bwatcher->heap_handle);
}
// Receive a signal; return true to disable signal watch or false to leave enabled
template <typename T>
- bool receiveSignal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata) noexcept
+ bool receive_signal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata) noexcept
{
BaseSignalWatcher * bwatcher = static_cast<BaseSignalWatcher *>(userdata);
bwatcher->siginfo = siginfo;
bfdw->event_flags |= flags;
- BaseWatcher * bwatcher = bfdw;
+ base_watcher * bwatcher = bfdw;
bool is_multi_watch = bfdw->watch_flags & multi_watch;
if (is_multi_watch) {
}
// Pull a single event from the queue; returns nullptr if the queue is empty.
- BaseWatcher * pullEvent() noexcept
+ base_watcher * pullEvent() noexcept
{
if (event_queue.empty()) {
return nullptr;
}
auto & rhndl = event_queue.get_root();
- BaseWatcher *r = event_queue.node_data(rhndl);
+ base_watcher *r = event_queue.node_data(rhndl);
event_queue.pull_root();
return r;
}
- void issueDelete(BaseWatcher *watcher) noexcept
+ void issueDelete(base_watcher *watcher) noexcept
{
// This is only called when the attention lock is held, so if the watcher is not
// active/queued now, it cannot become active (and will not be reported with an event)
watcher->read_removed = true;
}
- BaseWatcher *secondary = &(watcher->outWatcher);
+ base_watcher *secondary = &(watcher->outWatcher);
if (secondary->active) {
secondary->deleteme = true;
release_watcher(watcher);
friend class dprivate::timer<my_event_loop_t>;
friend void dprivate::post_dispatch<my_event_loop_t>(my_event_loop_t &loop,
- dprivate::BaseWatcher *watcher, rearm rearmType);
+ dprivate::base_watcher *watcher, rearm rearmType);
template <typename, typename> friend class dprivate::fd_watcher_impl;
template <typename, typename> friend class dprivate::bidi_fd_watcher_impl;
template <typename T, typename U> using EventDispatch = dprivate::EventDispatch<T,U>;
template <typename T> using waitqueue = dprivate::waitqueue<T>;
template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
- using BaseWatcher = dprivate::BaseWatcher;
- using BaseSignalWatcher = dprivate::BaseSignalWatcher<T_Mutex,LoopTraits>;
- using BaseFdWatcher = dprivate::BaseFdWatcher<T_Mutex>;
- using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher<T_Mutex>;
- using BaseChildWatcher = dprivate::BaseChildWatcher<T_Mutex>;
- using BaseTimerWatcher = dprivate::BaseTimerWatcher<T_Mutex>;
- using WatchType = dprivate::WatchType;
+ using BaseWatcher = dprivate::base_watcher;
+ using BaseSignalWatcher = dprivate::base_signal_watcher<T_Mutex,LoopTraits>;
+ using BaseFdWatcher = dprivate::base_fd_watcher<T_Mutex>;
+ using BaseBidiFdWatcher = dprivate::base_bidi_fd_watcher<T_Mutex>;
+ using BaseChildWatcher = dprivate::base_child_watcher<T_Mutex>;
+ using BaseTimerWatcher = dprivate::base_timer_watcher<T_Mutex>;
+ using watch_type_t = dprivate::watch_type_t;
Loop<EventDispatch<T_Mutex, LoopTraits>> loop_mech;
loop_mech.removeSignalWatch(signo);
waitqueue_node<T_Mutex> qnode;
- getAttnLock(qnode);
+ get_attn_lock(qnode);
EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
ed.issueDelete(callBack);
- releaseLock(qnode);
+ release_lock(qnode);
}
- void registerFd(BaseFdWatcher *callback, int fd, int eventmask, bool enabled)
+ void registerFd(BaseFdWatcher *callback, int fd, int eventmask, bool enabled, bool emulate = false)
{
loop_mech.prepare_watcher(callback);
try {
- loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled);
+ if (! loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled, emulate)) {
+ callback->emulatefd = true;
+ callback->emulate_enabled = enabled;
+ if (enabled) {
+ callback->event_flags = eventmask & IO_EVENTS;
+ if (eventmask & IO_EVENTS) {
+ requeue_watcher(callback);
+ }
+ }
+ }
}
catch (...) {
loop_mech.release_watcher(callback);
}
}
- void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask)
+ void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask, bool emulate = false)
{
loop_mech.prepare_watcher(callback);
try {
loop_mech.prepare_watcher(&callback->outWatcher);
try {
if (LoopTraits::has_separate_rw_fd_watches) {
- loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT);
+ int r = loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT, emulate);
+ if (r & IN_EVENTS) {
+ callback->emulatefd = true;
+ if (eventmask & IN_EVENTS) {
+ requeue_watcher(callback);
+ }
+ }
+ if (r & OUT_EVENTS) {
+ callback->outWatcher.emulatefd = true;
+ if (eventmask & OUT_EVENTS) {
+ requeue_watcher(&callback->outWatcher);
+ }
+ }
}
else {
- loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT);
+ if (! loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, true, emulate)) {
+ callback->emulatefd = true;
+ callback->outWatcher.emulatefd = true;
+ if (eventmask & IN_EVENTS) {
+ requeue_watcher(callback);
+ }
+ if (eventmask & OUT_EVENTS) {
+ requeue_watcher(&callback->outWatcher);
+ }
+ }
}
}
catch (...) {
void deregister(BaseFdWatcher *callback, int fd) noexcept
{
- loop_mech.removeFdWatch(fd, callback->watch_flags);
+ if (callback->emulatefd) {
+ auto & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+ ed.issueDelete(callback);
+ return;
+ }
+ loop_mech.removeFdWatch(fd, callback->watch_flags);
+
waitqueue_node<T_Mutex> qnode;
- getAttnLock(qnode);
+ get_attn_lock(qnode);
- EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+ auto & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
ed.issueDelete(callback);
- releaseLock(qnode);
+ release_lock(qnode);
}
void deregister(BaseBidiFdWatcher *callback, int fd) noexcept
}
waitqueue_node<T_Mutex> qnode;
- getAttnLock(qnode);
+ get_attn_lock(qnode);
EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
ed.issueDelete(callback);
- releaseLock(qnode);
+ release_lock(qnode);
}
void reserveChildWatch(BaseChildWatcher *callback)
loop_mech.removeChildWatch(child);
waitqueue_node<T_Mutex> qnode;
- getAttnLock(qnode);
+ get_attn_lock(qnode);
EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
ed.issueDelete(callback);
- releaseLock(qnode);
+ release_lock(qnode);
}
void registerTimer(BaseTimerWatcher *callback, clock_type clock)
loop_mech.removeTimer(callback->timer_handle, clock);
waitqueue_node<T_Mutex> qnode;
- getAttnLock(qnode);
+ get_attn_lock(qnode);
EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
ed.issueDelete(callback);
- releaseLock(qnode);
+ release_lock(qnode);
}
- void dequeueWatcher(BaseWatcher *watcher) noexcept
+ void dequeue_watcher(BaseWatcher *watcher) noexcept
{
loop_mech.dequeueWatcher(watcher);
}
- void requeueWatcher(BaseWatcher *watcher) noexcept
+ void requeue_watcher(BaseWatcher *watcher) noexcept
{
loop_mech.queueWatcher(watcher);
}
// Acquire the attention lock (when held, ensures that no thread is polling the AEN
// mechanism). This can be used to safely remove watches, since it is certain that
// notification callbacks won't be run while the attention lock is held.
- void getAttnLock(waitqueue_node<T_Mutex> &qnode) noexcept
+ void get_attn_lock(waitqueue_node<T_Mutex> &qnode) noexcept
{
std::unique_lock<T_Mutex> ulock(wait_lock);
attn_waitqueue.queue(&qnode);
// the attention lock). The poll-wait lock is used to prevent more than a single thread from
// polling the event loop mechanism at a time; if this is not done, it is basically
// impossible to safely deregister watches.
- void getPollwaitLock(waitqueue_node<T_Mutex> &qnode) noexcept
+ void get_pollwait_lock(waitqueue_node<T_Mutex> &qnode) noexcept
{
std::unique_lock<T_Mutex> ulock(wait_lock);
if (attn_waitqueue.isEmpty()) {
}
// Release the poll-wait/attention lock.
- void releaseLock(waitqueue_node<T_Mutex> &qnode) noexcept
+ void release_lock(waitqueue_node<T_Mutex> &qnode) noexcept
{
std::unique_lock<T_Mutex> ulock(wait_lock);
waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
// Process rearm return for fd_watcher, including the primary watcher of a bidi_fd_watcher
rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch) noexcept
{
+ bool emulatedfd = static_cast<BaseWatcher *>(bfw)->emulatefd;
+
// Called with lock held
if (is_multi_watch) {
BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
-
+
if (rearmType == rearm::REMOVE) {
bdfw->read_removed = 1;
if (LoopTraits::has_separate_rw_fd_watches) {
bdfw->watch_flags &= ~IN_EVENTS;
- loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
+ if (! emulatedfd) {
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
+ }
return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP;
}
else {
if (! bdfw->write_removed) {
if (bdfw->watch_flags & IN_EVENTS) {
bdfw->watch_flags &= ~IN_EVENTS;
- loop_mech.enableFdWatch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags);
+ if (! emulatedfd) {
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags);
+ }
}
return rearm::NOOP;
}
else {
// both removed: actually remove
- loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
+ if (! emulatedfd) {
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
+ }
return rearm::REMOVE;
}
}
else if (rearmType == rearm::DISARM) {
bdfw->watch_flags &= ~IN_EVENTS;
- if (! LoopTraits::has_separate_rw_fd_watches) {
- int watch_flags = bdfw->watch_flags;
- // without separate r/w watches, enableFdWatch actually sets
- // which sides are enabled (i.e. can be used to disable):
- loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
- static_cast<BaseWatcher *>(bdfw),
- (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
- }
- else {
- loop_mech.disableFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
+ if (! emulatedfd) {
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ int watch_flags = bdfw->watch_flags;
+ // without separate r/w watches, enableFdWatch actually sets
+ // which sides are enabled (i.e. can be used to disable):
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ }
+ else {
+ loop_mech.disableFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
+ }
}
}
else if (rearmType == rearm::REARM) {
bdfw->watch_flags |= IN_EVENTS;
- if (! LoopTraits::has_separate_rw_fd_watches) {
- int watch_flags = bdfw->watch_flags;
- loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
- static_cast<BaseWatcher *>(bdfw),
- (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ if (! emulatedfd) {
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ int watch_flags = bdfw->watch_flags;
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ }
+ else {
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ IN_EVENTS | ONE_SHOT);
+ }
}
else {
- loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
- static_cast<BaseWatcher *>(bdfw),
- IN_EVENTS | ONE_SHOT);
+ rearmType = rearm::REQUEUE;
+ }
+ }
+ else if (rearmType == rearm::NOOP) {
+ if (bdfw->emulatefd) {
+ if (bdfw->watch_flags & IN_EVENTS) {
+ rearmType = rearm::REQUEUE;
+ }
}
}
return rearmType;
}
else { // Not multi-watch:
- if (rearmType == rearm::REARM) {
+ if (emulatedfd) {
+ if (rearmType == rearm::REARM) {
+ bfw->emulate_enabled = true;
+ rearmType = rearm::REQUEUE;
+ }
+ else if (rearmType == rearm::DISARM) {
+ bfw->emulate_enabled = false;
+ }
+ else if (rearmType == rearm::NOOP) {
+ if (bfw->emulate_enabled) {
+ rearmType = rearm::REQUEUE;
+ }
+ }
+ }
+ else if (rearmType == rearm::REARM) {
loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw,
(bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
}
}
// Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher.
- rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, rearm rearmType) noexcept
+ rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, BaseWatcher * outw, rearm rearmType) noexcept
{
+ bool emulatedfd = outw->emulatefd;
+
// Called with lock held
- if (rearmType == rearm::REMOVE) {
+ if (emulatedfd) {
+ if (rearmType == rearm::REMOVE) {
+ bdfw->write_removed = 1;
+ bdfw->watch_flags &= ~OUT_EVENTS;
+ rearmType = bdfw->read_removed ? rearm::REMOVE : rearm::NOOP;
+ }
+ else if (rearmType == rearm::DISARM) {
+ bdfw->watch_flags &= ~OUT_EVENTS;
+ }
+ else if (rearmType == rearm::REARM) {
+ bdfw->watch_flags |= OUT_EVENTS;
+ rearmType = rearm::REQUEUE;
+ }
+ else if (rearmType == rearm::NOOP) {
+ if (bdfw->watch_flags & OUT_EVENTS) {
+ rearmType = rearm::REQUEUE;
+ }
+ }
+ return rearmType;
+ }
+ else if (rearmType == rearm::REMOVE) {
bdfw->write_removed = 1;
-
+
if (LoopTraits::has_separate_rw_fd_watches) {
bdfw->watch_flags &= ~OUT_EVENTS;
loop_mech.removeFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS);
// (Above variables are initialised only to silence compiler warnings).
- if (pqueue->watchType == WatchType::SECONDARYFD) {
+ if (pqueue->watchType == watch_type_t::SECONDARYFD) {
// construct a pointer to the main watcher:
char * rp = (char *)pqueue;
+ _Pragma ("GCC diagnostic push")
+ _Pragma ("GCC diagnostic ignored \"-Winvalid-offsetof\"")
rp -= offsetof(BaseBidiFdWatcher, outWatcher);
+ _Pragma ("GCC diagnostic pop")
bbfw = (BaseBidiFdWatcher *)rp;
// issue a secondary dispatch:
return active;
}
-
public:
using mutex_t = T_Mutex;
template <typename D> using child_proc_watcher_impl = dprivate::child_proc_watcher_impl<my_event_loop_t, D>;
template <typename D> using timer_impl = dprivate::timer_impl<my_event_loop_t, D>;
+ // Poll the event loop and process any pending events. If no events are pending, wait
+ // for and process at least one event.
void run() noexcept
{
// Poll the mechanism first, in case high-priority events are pending:
waitqueue_node<T_Mutex> qnode;
- getPollwaitLock(qnode);
+ get_pollwait_lock(qnode);
loop_mech.pullEvents(false);
- releaseLock(qnode);
+ release_lock(qnode);
while (! processEvents()) {
// Pull events from the AEN mechanism and insert them in our internal queue:
- getPollwaitLock(qnode);
+ get_pollwait_lock(qnode);
loop_mech.pullEvents(true);
- releaseLock(qnode);
+ release_lock(qnode);
}
}
+ // Poll the event loop and process any pending events
void poll() noexcept
{
- // Poll the mechanism first, in case high-priority events are pending:
waitqueue_node<T_Mutex> qnode;
- getPollwaitLock(qnode);
+ get_pollwait_lock(qnode);
loop_mech.pullEvents(false);
- releaseLock(qnode);
+ release_lock(qnode);
processEvents();
}
+
+ // Get the current time corresponding to a specific clock.
+ // ts - the timespec variable to receive the time
+ // clock - specifies the clock
+ // force_update (default = false) - if true, the time returned will be updated from
+ // the system rather than being a previously cached result. It may be more
+ // accurate, but note that reading from a system clock may be relatively expensive.
+ void get_time(timespec &ts, clock_type clock, bool force_update = false) noexcept
+ {
+ loop_mech.get_time(ts, clock, force_update);
+ }
};
typedef event_loop<null_mutex> event_loop_n;
// Posix signal event watcher
template <typename EventLoop>
-class signal_watcher : private dprivate::BaseSignalWatcher<typename EventLoop::mutex_t, typename EventLoop::loop_traits_t>
+class signal_watcher : private dprivate::base_signal_watcher<typename EventLoop::mutex_t, typename EventLoop::loop_traits_t>
{
template <typename, typename> friend class signal_watcher_impl;
- using BaseWatcher = dprivate::BaseWatcher;
+ using BaseWatcher = dprivate::base_watcher;
using T_Mutex = typename EventLoop::mutex_t;
public:
- using siginfo_p = typename dprivate::BaseSignalWatcher<T_Mutex, typename EventLoop::loop_traits_t>::siginfo_p;
+ using siginfo_p = typename dprivate::base_signal_watcher<T_Mutex, typename EventLoop::loop_traits_t>::siginfo_p;
// Register this watcher to watch the specified signal.
// If an attempt is made to register with more than one event loop at
// Posix file descriptor event watcher
template <typename EventLoop>
-class fd_watcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
+class fd_watcher : private dprivate::base_fd_watcher<typename EventLoop::mutex_t>
{
template <typename, typename> friend class fd_watcher_impl;
- using BaseWatcher = dprivate::BaseWatcher;
+ using BaseWatcher = dprivate::base_watcher;
using T_Mutex = typename EventLoop::mutex_t;
protected:
this->priority = prio;
this->watch_fd = fd;
this->watch_flags = flags;
- eloop.registerFd(this, fd, flags, enabled);
+ eloop.registerFd(this, fd, flags, enabled, true);
+ }
+
+ void add_watch_noemu(EventLoop &eloop, int fd, int flags, bool enabled = true, int prio = DEFAULT_PRIORITY)
+ {
+ BaseWatcher::init();
+ this->priority = prio;
+ this->watch_fd = fd;
+ this->watch_flags = flags;
+ eloop.registerFd(this, fd, flags, enabled, false);
}
int get_watched_fd()
void set_enabled(EventLoop &eloop, bool enable) noexcept
{
std::lock_guard<T_Mutex> guard(eloop.getBaseLock());
- eloop.setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable);
+ if (this->emulatefd) {
+ this->emulate_enabled = enable;
+ }
+ else {
+ eloop.setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable);
+ }
if (! enable) {
- eloop.dequeueWatcher(this);
+ eloop.dequeue_watcher(this);
}
}
template <typename T>
static fd_watcher<EventLoop> *add_watch(EventLoop &eloop, int fd, int flags, T watchHndlr)
{
- class LambdaFdWatcher : public fd_watcher_impl<EventLoop, LambdaFdWatcher>
+ class lambda_fd_watcher : public fd_watcher_impl<EventLoop, lambda_fd_watcher>
{
private:
T watchHndlr;
public:
- LambdaFdWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a)
+ lambda_fd_watcher(T watchHandlr_a) : watchHndlr(watchHandlr_a)
{
//
}
}
};
- LambdaFdWatcher * lfd = new LambdaFdWatcher(watchHndlr);
+ lambda_fd_watcher * lfd = new lambda_fd_watcher(watchHndlr);
lfd->add_watch(eloop, fd, flags);
return lfd;
}
void dispatch(void *loop_ptr) noexcept override
{
EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
+
+ // In case emulating, clear enabled here; REARM or explicit set_enabled will re-enable.
+ this->emulate_enabled = false;
+
loop.getBaseLock().unlock();
auto rearmType = static_cast<Derived *>(this)->fd_event(loop, this->watch_fd, this->event_flags);
// This watcher type has two event notification methods which can both potentially be
// active at the same time.
template <typename EventLoop>
-class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mutex_t>
+class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher<typename EventLoop::mutex_t>
{
template <typename, typename> friend class bidi_fd_watcher_impl;
- using BaseWatcher = dprivate::BaseWatcher;
+ using BaseWatcher = dprivate::base_watcher;
using T_Mutex = typename EventLoop::mutex_t;
void set_watch_enabled(EventLoop &eloop, bool in, bool b)
else {
this->watch_flags &= ~events;
}
- if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) {
- dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
- eloop.setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b);
- if (! b) {
- eloop.dequeueWatcher(watcher);
+
+ dprivate::base_watcher * watcher = in ? this : &this->outWatcher;
+
+ if (! basewatcher_get_emulatefd(*watcher)) {
+ if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) {
+ eloop.setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b);
}
- }
- else {
- eloop.setFdEnabled_nolock(this, this->watch_fd,
- (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT,
- (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) != 0);
- if (! b) {
- dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
- eloop.dequeueWatcher(watcher);
+ else {
+ eloop.setFdEnabled_nolock(this, this->watch_fd,
+ (this->watch_flags & IO_EVENTS) | ONE_SHOT,
+ (this->watch_flags & IO_EVENTS) != 0);
}
}
+
+ if (! b) {
+ eloop.dequeue_watcher(watcher);
+ }
}
public:
void set_watches(EventLoop &eloop, int newFlags)
{
std::lock_guard<T_Mutex> guard(eloop.getBaseLock());
- if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) {
+ bool use_emulation = this->emulatefd || basewatcher_get_emulatefd(this->outWatcher);
+ if (use_emulation || EventLoop::loop_traits_t::has_separate_rw_fd_watches) {
set_watch_enabled(eloop, true, (newFlags & IN_EVENTS) != 0);
set_watch_enabled(eloop, false, (newFlags & OUT_EVENTS) != 0);
}
else {
this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags;
- eloop.setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
+ eloop.setFdEnabled((dprivate::base_watcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
}
}
this->write_removed = false;
this->priority = inprio;
this->set_priority(this->outWatcher, outprio);
- eloop.registerFd(this, fd, flags);
+ eloop.registerFd(this, fd, flags, true);
}
-
+
+ void add_watch_noemu(EventLoop &eloop, int fd, int flags, int inprio = DEFAULT_PRIORITY, int outprio = DEFAULT_PRIORITY)
+ {
+ BaseWatcher::init();
+ this->outWatcher.BaseWatcher::init();
+ this->watch_fd = fd;
+ this->watch_flags = flags | dprivate::multi_watch;
+ this->read_removed = false;
+ this->write_removed = false;
+ this->priority = inprio;
+ this->set_priority(this->outWatcher, outprio);
+ eloop.registerFd(this, fd, flags, false);
+ }
+
int get_watched_fd()
{
return this->watch_fd;
void dispatch(void *loop_ptr) noexcept override
{
EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
+ this->emulate_enabled = false;
loop.getBaseLock().unlock();
auto rearmType = static_cast<Derived *>(this)->read_ready(loop, this->watch_fd);
rearmType = rearm::REMOVE;
}
- rearmType = loop.processSecondaryRearm(this, rearmType);
+ rearmType = loop.processSecondaryRearm(this, &outwatcher, rearmType);
if (rearmType == rearm::REQUEUE) {
post_dispatch(loop, &outwatcher, rearmType);
// Child process event watcher
template <typename EventLoop>
-class child_proc_watcher : private dprivate::BaseChildWatcher<typename EventLoop::mutex_t>
+class child_proc_watcher : private dprivate::base_child_watcher<typename EventLoop::mutex_t>
{
template <typename, typename> friend class child_proc_watcher_impl;
- using BaseWatcher = dprivate::BaseWatcher;
+ using BaseWatcher = dprivate::base_watcher;
using T_Mutex = typename EventLoop::mutex_t;
public:
// Returns:
// - the child pid in the parent
// - 0 in the child
- pid_t fork(EventLoop &eloop, bool from_reserved = false)
+ pid_t fork(EventLoop &eloop, bool from_reserved = false, int prio = DEFAULT_PRIORITY)
{
+ BaseWatcher::init();
+ this->priority = prio;
+
if (EventLoop::loop_traits_t::supports_childwatch_reservation) {
// Reserve a watch, fork, then claim reservation
if (! from_reserved) {
}
// Register this watcher.
+ this->watch_pid = child;
eloop.registerReservedChild_nolock(this, child);
lock.unlock();
return child;
// Register this watcher.
try {
+ this->watch_pid = child;
eloop.registerChild(this, child);
// Continue in child (it doesn't matter what is written):
EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
loop.getBaseLock().unlock();
- auto rearmType = static_cast<Derived *>(this)->child_status(loop, this->watch_pid, this->child_status);
+ auto rearmType = static_cast<Derived *>(this)->status_change(loop, this->watch_pid, this->child_status);
loop.getBaseLock().lock();
};
template <typename EventLoop>
-class timer : private BaseTimerWatcher<typename EventLoop::mutex_t>
+class timer : private base_timer_watcher<typename EventLoop::mutex_t>
{
template <typename, typename> friend class timer_impl;
- using base_t = BaseTimerWatcher<typename EventLoop::mutex_t>;
+ using base_t = base_timer_watcher<typename EventLoop::mutex_t>;
public:
void add_timer(EventLoop &eloop, clock_type clock = clock_type::MONOTONIC, int prio = DEFAULT_PRIORITY)
{
+ base_watcher::init();
this->priority = prio;
this->clock = clock;
eloop.registerTimer(this, clock);