namespace dasynq {
#ifdef __APPLE__
-int pipe2(int filedes[2], int flags)
+inline int pipe2(int filedes[2], int flags)
{
if (pipe(filedes) == -1) {
return -1;
}
if (flags & O_CLOEXEC) {
- fcntl(filedes[0], F_SETFD, FD_CLOEXEC);
- fcntl(filedes[1], F_SETFD, FD_CLOEXEC);
+ fcntl(filedes[0], F_SETFD, FD_CLOEXEC);
+ fcntl(filedes[1], F_SETFD, FD_CLOEXEC);
}
if (flags & O_NONBLOCK) {
- fcntl(filedes[0], F_SETFL, O_NONBLOCK);
- fcntl(filedes[1], F_SETFL, O_NONBLOCK);
+ fcntl(filedes[0], F_SETFL, O_NONBLOCK);
+ fcntl(filedes[1], F_SETFL, O_NONBLOCK);
}
return 0;
/** Remove the event watcher (and call "removed" callback) */
REMOVE,
/** The watcher has been removed - don't touch it! */
- REMOVED
-// TODO: add a REQUEUE option, which means, "I didn't complete input/output, run me again soon"
+ REMOVED,
+ /** RE-queue the watcher to have its notification called again */
+ REQUEUE
};
// Forward declarations:
{
template <typename T_Mutex, typename Traits> friend class EventDispatch;
template <typename T_Mutex, template <typename> class, typename> friend class dasynq::event_loop;
+ friend inline void basewatcher_set_active(BaseWatcher &watcher, bool active);
+ friend inline bool basewatcher_get_deleteme(const BaseWatcher &watcher);
protected:
WatchType watchType;
}
};
+ inline void basewatcher_set_active(BaseWatcher &watcher, bool active)
+ {
+ watcher.active = active;
+ }
+
+ inline bool basewatcher_get_deleteme(const BaseWatcher &watcher)
+ {
+ return watcher.deleteme;
+ }
+
// Base signal event - not part of public API
template <typename T_Mutex, typename Traits>
class BaseSignalWatcher : public BaseWatcher
}
};
+ // Do standard post-dispatch processing for a watcher. This handles the case of removing or
+ // re-queing watchers depending on the rearm type.
+ template <typename Loop> void post_dispatch(Loop &loop, BaseWatcher *watcher, rearm rearmType)
+ {
+ if (rearmType == rearm::REMOVE) {
+ loop.getBaseLock().unlock();
+ watcher->watch_removed();
+ loop.getBaseLock().lock();
+ }
+ else if (rearmType == rearm::REQUEUE) {
+ loop.requeueWatcher(watcher);
+ }
+ }
+
// This class serves as the base class (mixin) for the AEN mechanism class.
//
// The EventDispatch class maintains the queued event data structures. It inserts watchers
- // into the queue when eventes are received (receiveXXX methods).
+ // into the queue when events are received (receiveXXX methods).
template <typename T_Mutex, typename Traits> class EventDispatch : public Traits
{
template <typename, template <typename> class, typename> friend class dasynq::event_loop;
event_queue.insert(bwatcher->heap_handle, bwatcher->priority);
}
- bool isQueued(BaseWatcher *bwatcher)
+ bool isQueued(BaseWatcher *bwatcher) noexcept
{
return event_queue.is_queued(bwatcher->heap_handle);
}
- void dequeueWatcher(BaseWatcher *bwatcher)
+ void dequeueWatcher(BaseWatcher *bwatcher) noexcept
{
if (event_queue.is_queued(bwatcher->heap_handle)) {
event_queue.remove(bwatcher->heap_handle);
}
}
-
+
// Remove watcher from the queueing system
- void release_watcher(BaseWatcher *bwatcher)
+ void release_watcher(BaseWatcher *bwatcher) noexcept
{
event_queue.deallocate(bwatcher->heap_handle);
}
protected:
T_Mutex lock;
- template <typename T> void init(T *loop) { }
+ template <typename T> void init(T *loop) noexcept { }
// Receive a signal; return true to disable signal watch or false to leave enabled
template <typename T>
- bool receiveSignal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata)
+ bool receiveSignal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata) noexcept
{
BaseSignalWatcher * bwatcher = static_cast<BaseSignalWatcher *>(userdata);
bwatcher->siginfo = siginfo;
}
template <typename T>
- void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags)
+ void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags) noexcept
{
BaseFdWatcher * bfdw = static_cast<BaseFdWatcher *>(userdata);
}
}
- void receiveChildStat(pid_t child, int status, void * userdata)
+ void receiveChildStat(pid_t child, int status, void * userdata) noexcept
{
BaseChildWatcher * watcher = static_cast<BaseChildWatcher *>(userdata);
watcher->child_status = status;
queueWatcher(watcher);
}
- void receiveTimerExpiry(timer_handle_t & timer_handle, void * userdata, int intervals)
+ void receiveTimerExpiry(timer_handle_t & timer_handle, void * userdata, int intervals) noexcept
{
BaseTimerWatcher * watcher = static_cast<BaseTimerWatcher *>(userdata);
watcher->intervals = intervals;
queueWatcher(watcher);
}
- // Pull a single event from the queue
- BaseWatcher * pullEvent()
+ // Pull a single event from the queue; returns nullptr if the queue is empty.
+ BaseWatcher * pullEvent() noexcept
{
if (event_queue.empty()) {
return nullptr;
friend class dprivate::child_proc_watcher<my_event_loop_t>;
friend class dprivate::timer<my_event_loop_t>;
+ friend void dprivate::post_dispatch<my_event_loop_t>(my_event_loop_t &loop,
+ dprivate::BaseWatcher *watcher, rearm rearmType);
+
template <typename, typename> friend class dprivate::fd_watcher_impl;
template <typename, typename> friend class dprivate::bidi_fd_watcher_impl;
template <typename, typename> friend class dprivate::signal_watcher_impl;
waitqueue<T_Mutex> attn_waitqueue;
waitqueue<T_Mutex> wait_waitqueue;
- T_Mutex &getBaseLock()
+ T_Mutex &getBaseLock() noexcept
{
return loop_mech.lock;
}
}
}
- void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled)
+ void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) noexcept
{
if (enabled) {
loop_mech.enableFdWatch(fd, watcher, watch_flags | ONE_SHOT);
}
}
- void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled)
+ void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) noexcept
{
if (enabled) {
loop_mech.enableFdWatch_nolock(fd, watcher, watch_flags | ONE_SHOT);
}
}
- void deregister(BaseFdWatcher *callback, int fd)
+ void deregister(BaseFdWatcher *callback, int fd) noexcept
{
loop_mech.removeFdWatch(fd, callback->watch_flags);
releaseLock(qnode);
}
- void deregister(BaseBidiFdWatcher *callback, int fd)
+ void deregister(BaseBidiFdWatcher *callback, int fd) noexcept
{
if (LoopTraits::has_separate_rw_fd_watches) {
loop_mech.removeBidiFdWatch(fd);
}
}
- void unreserve(BaseChildWatcher *callback)
+ void unreserve(BaseChildWatcher *callback) noexcept
{
loop_mech.unreserveChildWatch();
loop_mech.release_watcher(callback);
loop_mech.addReservedChildWatch_nolock(child, callBack);
}
- void deregister(BaseChildWatcher *callback, pid_t child)
+ void deregister(BaseChildWatcher *callback, pid_t child) noexcept
{
loop_mech.removeChildWatch(child);
}
}
- void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, clock_type clock)
+ void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, clock_type clock) noexcept
{
struct timespec interval {0, 0};
loop_mech.setTimer(callBack->timer_handle, timeout, interval, true, clock);
}
- void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval, clock_type clock)
+ void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval, clock_type clock) noexcept
{
loop_mech.setTimer(callBack->timer_handle, timeout, interval, true, clock);
}
- void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, clock_type clock)
+ void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, clock_type clock) noexcept
{
struct timespec interval {0, 0};
loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true, clock);
}
- void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval, clock_type clock)
+ void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval, clock_type clock) noexcept
{
loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true, clock);
}
- void stop_timer(BaseTimerWatcher *callback, clock_type clock)
+ void stop_timer(BaseTimerWatcher *callback, clock_type clock) noexcept
{
loop_mech.stop_timer(callback->timer_handle, clock);
}
- void deregister(BaseTimerWatcher *callback, clock_type clock)
+ void deregister(BaseTimerWatcher *callback, clock_type clock) noexcept
{
loop_mech.removeTimer(callback->timer_handle, clock);
loop_mech.dequeueWatcher(watcher);
}
+ void requeueWatcher(BaseWatcher *watcher) noexcept
+ {
+ loop_mech.queueWatcher(watcher);
+ }
+
// Acquire the attention lock (when held, ensures that no thread is polling the AEN
- // mechanism).
- void getAttnLock(waitqueue_node<T_Mutex> &qnode)
+ // mechanism). This can be used to safely remove watches, since it is certain that
+ // notification callbacks won't be run while the attention lock is held.
+ void getAttnLock(waitqueue_node<T_Mutex> &qnode) noexcept
{
std::unique_lock<T_Mutex> ulock(wait_lock);
attn_waitqueue.queue(&qnode);
}
}
- // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower
- // priority than the attention lock).
- void getPollwaitLock(waitqueue_node<T_Mutex> &qnode)
+ // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower priority than
+ // the attention lock). The poll-wait lock is used to prevent more than a single thread from
+ // polling the event loop mechanism at a time; if this is not done, it is basically
+ // impossible to safely deregister watches.
+ void getPollwaitLock(waitqueue_node<T_Mutex> &qnode) noexcept
{
std::unique_lock<T_Mutex> ulock(wait_lock);
if (attn_waitqueue.isEmpty()) {
}
// Release the poll-wait/attention lock.
- void releaseLock(waitqueue_node<T_Mutex> &qnode)
+ void releaseLock(waitqueue_node<T_Mutex> &qnode) noexcept
{
std::unique_lock<T_Mutex> ulock(wait_lock);
waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
}
}
- void processSignalRearm(BaseSignalWatcher * bsw, rearm rearmType)
+ void processSignalRearm(BaseSignalWatcher * bsw, rearm rearmType) noexcept
{
// Called with lock held
if (rearmType == rearm::REARM) {
else if (rearmType == rearm::REMOVE) {
loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo());
}
+ // Note that signal watchers cannot (currently) be disarmed
}
// Process rearm return for fd_watcher, including the primary watcher of a bidi_fd_watcher
- rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch)
+ rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch) noexcept
{
- // Called with lock held;
- // bdfw->event_flags contains only with pending (queued) events
-
+ // Called with lock held
if (is_multi_watch) {
BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
}
}
else if (rearmType == rearm::DISARM) {
- // TODO should actually disarm.
+ bdfw->watch_flags &= ~IN_EVENTS;
+
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ int watch_flags = bdfw->watch_flags;
+ // without separate r/w watches, enableFdWatch actually sets
+ // which sides are enabled (i.e. can be used to disable):
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ }
+ else {
+ loop_mech.disableFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
+ }
}
else if (rearmType == rearm::REARM) {
bdfw->watch_flags |= IN_EVENTS;
loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw,
(bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
}
+ else if (rearmType == rearm::DISARM) {
+ loop_mech.disableFdWatch_nolock(bfw->watch_fd, bfw->watch_flags);
+ }
else if (rearmType == rearm::REMOVE) {
loop_mech.removeFdWatch_nolock(bfw->watch_fd, bfw->watch_flags);
}
}
// Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher.
- rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, rearm rearmType)
+ rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, rearm rearmType) noexcept
{
// Called with lock held
if (rearmType == rearm::REMOVE) {
}
}
else if (rearmType == rearm::DISARM) {
- // TODO actually disarm.
+ bdfw->watch_flags &= ~OUT_EVENTS;
+
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ int watch_flags = bdfw->watch_flags;
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ }
+ else {
+ loop_mech.disableFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS);
+ }
}
else if (rearmType == rearm::REARM) {
bdfw->watch_flags |= OUT_EVENTS;
return rearmType;
}
- void processTimerRearm(BaseTimerWatcher *btw, rearm rearmType)
+ void processTimerRearm(BaseTimerWatcher *btw, rearm rearmType) noexcept
{
// Called with lock held
if (rearmType == rearm::REARM) {
else if (rearmType == rearm::REMOVE) {
loop_mech.removeTimer_nolock(btw->timer_handle, btw->clock);
}
- // TODO DISARM?
+ else if (rearmType == rearm::DISARM) {
+ loop_mech.enableTimer_nolock(btw->timer_handle, false, btw->clock);
+ }
}
+ // Process all queued events; returns true if any events were processed.
bool processEvents() noexcept
{
EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
void run() noexcept
{
+ // Poll the mechanism first, in case high-priority events are pending:
+ waitqueue_node<T_Mutex> qnode;
+ getPollwaitLock(qnode);
+ loop_mech.pullEvents(false);
+ releaseLock(qnode);
+
while (! processEvents()) {
- waitqueue_node<T_Mutex> qnode;
-
- // We only allow one thread to poll the mechanism at any time, since otherwise
- // removing event watchers is a nightmare beyond comprehension.
- getPollwaitLock(qnode);
-
// Pull events from the AEN mechanism and insert them in our internal queue:
+ getPollwaitLock(qnode);
loop_mech.pullEvents(true);
-
- // Now release the wait lock:
releaseLock(qnode);
}
}
-};
-typedef event_loop<null_mutex> NEventLoop;
-typedef event_loop<std::mutex> TEventLoop;
+ void poll() noexcept
+ {
+ // Poll the mechanism first, in case high-priority events are pending:
+ waitqueue_node<T_Mutex> qnode;
+ getPollwaitLock(qnode);
+ loop_mech.pullEvents(false);
+ releaseLock(qnode);
+
+ processEvents();
+ }
+};
-// from dasync.cc:
-TEventLoop & getSystemLoop();
+typedef event_loop<null_mutex> event_loop_n;
+typedef event_loop<std::mutex> event_loop_th;
namespace dprivate {
loop.processSignalRearm(this, rearmType);
- if (rearmType == rearm::REMOVE) {
- loop.getBaseLock().unlock();
- this->watch_removed();
- loop.getBaseLock().lock();
- }
+ post_dispatch(loop, this, rearmType);
}
}
};
rearmType = loop.processFdRearm(this, rearmType, false);
- if (rearmType == rearm::REMOVE) {
- loop.getBaseLock().unlock();
- this->watch_removed();
- loop.getBaseLock().lock();
- }
+ post_dispatch(loop, this, rearmType);
}
}
};
public:
- // This should never actually get called:
- /*
- rearm fdEvent(EventLoop &eloop, int fd, int flags) final
- {
- return rearm::REARM; // should not be reachable.
- };
- */
-
void set_in_watch_enabled(EventLoop &eloop, bool b) noexcept
{
eloop.getBaseLock().lock();
eloop.deregister(this, this->watch_fd);
}
+ template <typename T>
+ static bidi_fd_watcher<EventLoop> *add_watch(EventLoop &eloop, int fd, int flags, T watchHndlr)
+ {
+ class LambdaBidiWatcher : public bidi_fd_watcher_impl<EventLoop, LambdaBidiWatcher>
+ {
+ private:
+ T watchHndlr;
+
+ public:
+ LambdaBidiWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a)
+ {
+ //
+ }
+
+ rearm read_ready(EventLoop &eloop, int fd)
+ {
+ return watchHndlr(eloop, fd, IN_EVENTS);
+ }
+
+ rearm write_ready(EventLoop &eloop, int fd)
+ {
+ return watchHndlr(eloop, fd, OUT_EVENTS);
+ }
+
+ void watch_removed() noexcept override
+ {
+ delete this;
+ }
+ };
+
+ LambdaBidiWatcher * lfd = new LambdaBidiWatcher(watchHndlr);
+ lfd->add_watch(eloop, fd, flags);
+ return lfd;
+ }
+
// virtual rearm read_ready(EventLoop &eloop, int fd) noexcept = 0;
// virtual rearm write_ready(EventLoop &eloop, int fd) noexcept = 0;
};
rearmType = loop.processFdRearm(this, rearmType, true);
- if (rearmType == rearm::REMOVE) {
- loop.getBaseLock().unlock();
- this->watch_removed();
- loop.getBaseLock().lock();
- }
+ post_dispatch(loop, this, rearmType);
}
}
void dispatch_second(void *loop_ptr) noexcept override
{
+ auto &outwatcher = bidi_fd_watcher<EventLoop>::outWatcher;
+
EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
loop.getBaseLock().unlock();
if (rearmType != rearm::REMOVED) {
this->event_flags &= ~OUT_EVENTS;
- this->active = false;
- if (this->deleteme) {
+ basewatcher_set_active(outwatcher, false);
+ if (basewatcher_get_deleteme(outwatcher)) {
// We don't want a watch that is marked "deleteme" to re-arm itself.
rearmType = rearm::REMOVE;
}
rearmType = loop.processSecondaryRearm(this, rearmType);
- if (rearmType == rearm::REMOVE) {
- loop.getBaseLock().unlock();
- this->watch_removed();
- loop.getBaseLock().lock();
+ if (rearmType == rearm::REQUEUE) {
+ post_dispatch(loop, &outwatcher, rearmType);
+ }
+ else {
+ post_dispatch(loop, this, rearmType);
}
}
}
// Returns:
// - the child pid in the parent
// - 0 in the child
- pid_t fork(EventLoop &eloop)
+ pid_t fork(EventLoop &eloop, bool from_reserved = false)
{
if (EventLoop::loop_traits_t::supports_childwatch_reservation) {
// Reserve a watch, fork, then claim reservation
- reserve_watch(eloop);
+ if (! from_reserved) {
+ reserve_watch(eloop);
+ }
auto &lock = eloop.getBaseLock();
lock.lock();
}
// rearmType = loop.process??;
-
- if (rearmType == rearm::REMOVE) {
- loop.getBaseLock().unlock();
- this->watch_removed();
- loop.getBaseLock().lock();
- }
+ post_dispatch(loop, this, rearmType);
}
}
};
eloop.deregister(this, this->clock);
}
+ template <typename T>
+ static timer<EventLoop> *add_timer(EventLoop &eloop, clock_type clock, bool relative,
+ struct timespec &timeout, struct timespec &interval, T watchHndlr)
+ {
+ class lambda_timer : public timer_impl<EventLoop, lambda_timer>
+ {
+ private:
+ T watchHndlr;
+
+ public:
+ lambda_timer(T watchHandlr_a) : watchHndlr(watchHandlr_a)
+ {
+ //
+ }
+
+ rearm timer_expiry(EventLoop &eloop, int intervals)
+ {
+ return watchHndlr(eloop, intervals);
+ }
+
+ void watch_removed() noexcept override
+ {
+ delete this;
+ }
+ };
+
+ lambda_timer * lt = new lambda_timer(watchHndlr);
+ lt->add_timer(eloop, clock);
+ if (relative) {
+ lt->arm_timer_rel(eloop, timeout, interval);
+ }
+ else {
+ lt->arm_timer(eloop, timeout, interval);
+ }
+ return lt;
+ }
+
// Timer expired, and the given number of intervals have elapsed before
// expiry event was queued. Normally intervals == 1 to indicate no
// overrun.
loop.processTimerRearm(this, rearmType);
- if (rearmType == rearm::REMOVE) {
- loop.getBaseLock().unlock();
- this->watch_removed();
- loop.getBaseLock().lock();
- }
+ post_dispatch(loop, this, rearmType);
}
}
};