// Forward declarations:
template <typename T_Mutex> class EventLoop;
template <typename T_Mutex> class PosixFdWatcher;
+template <typename T_Mutex> class PosixBidiFdWatcher;
template <typename T_Mutex> class PosixSignalWatcher;
template <typename T_Mutex> class PosixChildWatcher;
int active : 1;
int deleteme : 1;
+ BaseWatcher * prev;
BaseWatcher * next;
public:
- BaseWatcher(WatchType wt) noexcept : watchType(wt), active(0), deleteme(0), next(nullptr) { }
+
+ // Perform initialisation necessary before registration with an event loop
+ void init()
+ {
+ active = false;
+ deleteme = false;
+ prev = nullptr;
+ next = nullptr;
+ }
+
+ BaseWatcher(WatchType wt) noexcept : watchType(wt) { }
virtual ~BaseWatcher() noexcept { }
template <typename, typename Traits> friend class EventDispatch;
friend class dasync::EventLoop<T_Mutex>;
- // The main instance is the "input" watcher only; we keep a secondary watcher
- // with a secondary set of flags for the "output" watcher:
- BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD);
-
- // This should never actually get called:
- Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags)
+ // This should never actually get called:
+ Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) final
{
return Rearm::REARM; // should not be reachable.
};
protected:
+
+ // The main instance is the "input" watcher only; we keep a secondary watcher
+ // with a secondary set of flags for the "output" watcher:
+ BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD);
+
int read_removed : 1; // read watch removed?
int write_removed : 1; // write watch removed?
public:
- virtual Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) = 0;
- virtual Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) = 0;
+ virtual Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept = 0;
+ virtual Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept = 0;
};
template <typename T_Mutex>
void queueWatcher(BaseWatcher *bwatcher)
{
- // TODO
- // We can't allow a queued entry to be deleted (due to the single-linked-list used for the queue)
- // so for now, I'll set it active; but this prevents it being deleted until we can next
- // process events, so once we have a proper linked list or better structure should probably
- // remove this:
- bwatcher->active = true;
-
// Put in queue:
- BaseWatcher * prev_first = first;
- first = bwatcher;
- bwatcher->next = prev_first;
+ if (first == nullptr) {
+ bwatcher->prev = bwatcher;
+ bwatcher->next = bwatcher;
+ first = bwatcher;
+ }
+ else {
+ first->prev->next = bwatcher;
+ bwatcher->prev = first->prev;
+ first->prev = bwatcher;
+ bwatcher->next = first;
+ }
+ }
+
+ void dequeueWatcher(BaseWatcher *bwatcher)
+ {
+ if (bwatcher->prev == bwatcher) {
+ // Only item in queue
+ first = nullptr;
+ }
+ else {
+ if (first == bwatcher) first = first->next;
+ bwatcher->prev->next = bwatcher->next;
+ bwatcher->next->prev = bwatcher->prev;
+ }
+
+ bwatcher->prev = nullptr;
+ bwatcher->next = nullptr;
+ }
+
+ bool isQueued(BaseWatcher *bwatcher)
+ {
+ return bwatcher->prev != nullptr;
}
protected:
queueWatcher(bwatcher);
- if (is_multi_watch && bfdw->event_flags != bfdw->watch_flags) {
- // We need to re-enable the other channel now:
- loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata,
- (bfdw->watch_flags & ~(bfdw->event_flags)) | one_shot);
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ // If this is a bidirectional fd-watch, it has been disabled in *both* directions
+ // as the event was delivered. However, the other direction should not be disabled
+ // yet, so we need to re-enable:
+ int in_out_mask = in_events | out_events;
+ if (is_multi_watch && bfdw->event_flags != (bfdw->watch_flags & in_out_mask)) {
+ // We need to re-enable the other channel now:
+ loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata,
+ (bfdw->watch_flags & ~(bfdw->event_flags)) | one_shot);
+ }
}
}
queueWatcher(watcher);
}
- // TODO is this needed?:
+ // Pull a single event from the queue
BaseWatcher * pullEvent()
{
- if (first) {
- BaseWatcher * r = first;
- first = first->next;
- return r;
+ BaseWatcher * r = first;
+ if (r != nullptr) {
+ dequeueWatcher(r);
}
- return nullptr;
+ return r;
}
void issueDelete(BaseWatcher *watcher) noexcept
{
// This is only called when the attention lock is held, so if the watcher is not
- // active/queued now, it cannot become active during execution of this function.
+ // active/queued now, it cannot become active (and will not be reported with an event)
+ // during execution of this function.
lock.lock();
}
else {
// Actually do the delete.
+ if (isQueued(watcher)) {
+ dequeueWatcher(watcher);
+ }
+
+ // TODO call this without lock?
watcher->watchRemoved();
}
template <typename T_Mutex> class EventLoop
{
friend class PosixFdWatcher<T_Mutex>;
+ friend class PosixBidiFdWatcher<T_Mutex>;
friend class PosixSignalWatcher<T_Mutex>;
friend class PosixChildWatcher<T_Mutex>;
waitqueue<T_Mutex> attn_waitqueue;
waitqueue<T_Mutex> wait_waitqueue;
+ T_Mutex &getBaseLock()
+ {
+ return loop_mech.lock;
+ }
void registerSignal(BaseSignalWatcher *callBack, int signo)
{
void registerFd(BaseFdWatcher *callback, int fd, int eventmask)
{
- loop_mech.addFdWatch(fd, callback, eventmask);
+ loop_mech.addFdWatch(fd, callback, eventmask | one_shot);
}
- void setEnabled(BaseFdWatcher *callback, int fd, int watch_flags, bool enabled)
+ void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask)
+ {
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ // TODO
+ }
+ else {
+ loop_mech.addFdWatch(fd, callback, eventmask | one_shot);
+ }
+ }
+
+ void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled)
{
if (enabled) {
- loop_mech.enableFdWatch(fd, static_cast<BaseWatcher *>(callback), watch_flags | one_shot);
+ loop_mech.enableFdWatch(fd, watcher, watch_flags | one_shot);
}
else {
loop_mech.disableFdWatch(fd);
}
}
+
+ void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled)
+ {
+ if (enabled) {
+ loop_mech.enableFdWatch_nolock(fd, watcher, watch_flags | one_shot);
+ }
+ else {
+ loop_mech.disableFdWatch_nolock(fd);
+ }
+ }
void deregister(BaseFdWatcher *callback, int fd)
{
releaseLock(qnode);
}
+ void deregister(BaseBidiFdWatcher *callback, int fd)
+ {
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ // TODO
+ }
+ else {
+ loop_mech.removeFdWatch(fd);
+
+ waitqueue_node<T_Mutex> qnode;
+ getAttnLock(qnode);
+
+ EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+ ed.issueDelete(callback);
+
+ releaseLock(qnode);
+ }
+ }
+
void reserveChildWatch(BaseChildWatcher *callBack)
{
loop_mech.reserveChildWatch();
if (rearmType == Rearm::REMOVE) {
bdfw->read_removed = 1;
bdfw->watch_flags &= ~in_events;
- if (! bdfw->write_removed) {
- return Rearm::NOOP;
+
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ if (! bdfw->write_removed) {
+ return Rearm::NOOP;
+ }
+ else {
+ // both removed: actually remove
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd);
+ return Rearm::REMOVE;
+ }
}
else {
- // both removed: actually remove
+ // TODO this will need flags for such a loop, since it can't
+ // otherwise distinguish which channel watch to remove
loop_mech.removeFdWatch_nolock(bdfw->watch_fd);
}
}
}
else if (rearmType == Rearm::REARM) {
bdfw->watch_flags |= in_events;
- loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
- static_cast<BaseWatcher *>(bdfw),
- (bdfw->watch_flags & (in_events | out_events)) | one_shot);
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ (bdfw->watch_flags & (in_events | out_events)) | one_shot);
+ }
+ else {
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ in_events | one_shot);
+ }
}
return rearmType;
}
if (rearmType == Rearm::REMOVE) {
bdfw->write_removed = 1;
bdfw->watch_flags &= ~out_events;
- if (! bdfw->read_removed) {
- return Rearm::NOOP;
+
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ // TODO this will need flags for such a loop, since it can't
+ // otherwise distinguish which channel watch to remove
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd);
+ return bdfw->read_removed ? Rearm::REMOVE : Rearm::NOOP;
}
else {
- // both removed: actually remove
- loop_mech.removeFdWatch_nolock(bdfw->watch_fd);
+ if (! bdfw->read_removed) {
+ return Rearm::NOOP;
+ }
+ else {
+ // both removed: actually remove
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd);
+ return Rearm::REMOVE;
+ }
}
}
else if (rearmType == Rearm::DISARM) {
}
else if (rearmType == Rearm::REARM) {
bdfw->watch_flags |= out_events;
- loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
- static_cast<BaseWatcher *>(bdfw),
- (bdfw->watch_flags & (in_events | out_events)) | one_shot);
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ (bdfw->watch_flags & (in_events | out_events)) | one_shot);
+ }
+ else {
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ out_events | one_shot);
+ }
}
return rearmType;
}
// So this pulls *all* currently pending events and processes them in the current thread.
// That's probably good for throughput, but maybe the behavior should be configurable.
- BaseWatcher * pqueue = ed.first;
- ed.first = nullptr;
+ BaseWatcher * pqueue = ed.pullEvent();
bool active = false;
- BaseWatcher * prev = nullptr;
- for (BaseWatcher * q = pqueue; q != nullptr; q = q->next) {
- if (q->deleteme) {
- // TODO should this really be called with lock held?
- q->watchRemoved();
- if (prev) {
- prev->next = q->next;
- }
- else {
- pqueue = q->next;
- }
- }
- else {
- q->active = true;
- active = true;
- prev = q;
- }
- }
+ while (pqueue != nullptr) {
- ed.lock.unlock();
+ pqueue->active = true;
+ active = true;
+
+ ed.lock.unlock();
- while (pqueue != nullptr) {
Rearm rearmType = Rearm::NOOP;
bool is_multi_watch = false;
BaseBidiFdWatcher *bbfw = nullptr;
if (is_multi_watch) {
// The primary watcher for a multi-watch watcher is queued for
// read events.
- BaseBidiFdWatcher *bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
- rearmType = bdfw->readReady(this, bfw->watch_fd);
+ bbfw = static_cast<BaseBidiFdWatcher *>(bfw);
+ rearmType = bbfw->readReady(this, bfw->watch_fd);
}
else {
rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags);
}
case WatchType::SECONDARYFD: {
// first construct a pointer to the main watcher:
+ is_multi_watch = true;
char * rp = (char *)pqueue;
rp -= offsetof(BaseBidiFdWatcher, outWatcher);
bbfw = (BaseBidiFdWatcher *)rp;
rearmType = processFdRearm(static_cast<BaseFdWatcher *>(pqueue), rearmType, is_multi_watch);
break;
case WatchType::SECONDARYFD:
- processSecondaryRearm(bbfw, rearmType);
+ rearmType = processSecondaryRearm(bbfw, rearmType);
break;
default: ;
}
ed.lock.unlock();
if (rearmType == Rearm::REMOVE) {
- pqueue->watchRemoved();
+ (is_multi_watch ? bbfw : pqueue)->watchRemoved();
}
- pqueue = pqueue->next;
+ ed.lock.lock();
+
+ pqueue = ed.pullEvent();
}
return active;
template <typename T_Mutex>
class PosixSignalWatcher : private dprivate::BaseSignalWatcher<T_Mutex>
{
+ using BaseWatcher = dprivate::BaseWatcher;
+
public:
using SigInfo_p = typename dprivate::BaseSignalWatcher<T_Mutex>::SigInfo_p;
// a time, behaviour is undefined.
inline void registerWatch(EventLoop<T_Mutex> *eloop, int signo)
{
- this->deleteme = false;
+ BaseWatcher::init();
this->siginfo.set_signo(signo);
eloop->registerSignal(this, signo);
}
template <typename T_Mutex>
class PosixFdWatcher : private dprivate::BaseFdWatcher<T_Mutex>
{
+ using BaseWatcher = dprivate::BaseWatcher;
+
protected:
- // Set the types of event to watch. May not be supported for all mechanisms.
- // Only safe to call from within the callback handler (gotEvent).
+ // Set the types of event to watch. Only supported if LoopTraits::has_bidi_fd_watch
+ // is true; otherwise has unspecified behavior.
+ // Only safe to call from within the callback handler (gotEvent). Might not take
+ // effect until the current callback handler returns with REARM.
void setWatchFlags(int newFlags)
{
this->watch_flags = newFlags;
// Can fail with std::bad_alloc or std::system_error.
void registerWith(EventLoop<T_Mutex> *eloop, int fd, int flags)
{
- this->deleteme = false;
+ BaseWatcher::init();
this->watch_fd = fd;
this->watch_flags = flags;
eloop->registerFd(this, fd, flags);
void setEnabled(EventLoop<T_Mutex> *eloop, bool enable) noexcept
{
- eloop->setEnabled(this, this->watch_fd, this->watch_flags, enable);
+ eloop->setFdEnabled(this, this->watch_fd, this->watch_flags, enable);
}
// virtual Rearm gotEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
};
+template <typename T_Mutex>
+class PosixBidiFdWatcher : private dprivate::BaseBidiFdWatcher<T_Mutex>
+{
+ using BaseWatcher = dprivate::BaseWatcher;
+
+ void setWatchEnabled(EventLoop<T_Mutex> *eloop, bool in, bool b)
+ {
+ int events = in ? in_events : out_events;
+
+ if (b) {
+ this->watch_flags |= events;
+ }
+ else {
+ this->watch_flags &= ~events;
+ }
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
+ eloop->setFdEnabled_nolock(watcher, this->watch_fd, events | one_shot, b);
+ }
+ else {
+ eloop->setFdEnabled_nolock(this, this->watch_fd,
+ (this->watch_flags & (in_events | out_events)) | one_shot,
+ (this->watch_flags & (in_events | out_events)) != 0);
+ }
+ }
+
+ protected:
+
+ void setInWatchEnabled(EventLoop<T_Mutex> *eloop, bool b) noexcept
+ {
+ eloop->getBaseLock().lock();
+ setWatchEnabled(eloop, true, b);
+ eloop->getBaseLock().unlock();
+ }
+
+ void setOutWatchEnabled(EventLoop<T_Mutex> *eloop, bool b) noexcept
+ {
+ eloop->getBaseLock().lock();
+ setWatchEnabled(eloop, false, b);
+ eloop->getBaseLock().unlock();
+ }
+
+ // Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly.
+ // This can only be called from the event handler callbacks (readReady/writeReady) and only if
+ // set{In|Out}WatchEnabled will not be called concurrently.
+ void setWatchFlags(EventLoop<T_Mutex> * eloop, int newFlags)
+ {
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ // (In this case, this function is fully thread-safe)
+ std::lock_guard<T_Mutex> guard(eloop->getBaseLock());
+ setWatchEnabled(eloop, true, (newFlags & in_events) != 0);
+ setWatchEnabled(eloop, false, (newFlags & out_events) != 0);
+ }
+ else {
+ this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags;
+ eloop->setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
+ }
+ }
+
+ public:
+
+ // Register a bi-direction file descriptor watcher with an event loop. Flags
+ // can be any combination of dasync::in_events / dasync::out_events.
+ //
+ // Can fail with std::bad_alloc or std::system_error.
+ void registerWith(EventLoop<T_Mutex> *eloop, int fd, int flags)
+ {
+ BaseWatcher::init();
+ this->outWatcher.BaseWatcher::init();
+ this->watch_fd = fd;
+ this->watch_flags = flags | dprivate::multi_watch;
+ eloop->registerFd(this, fd, flags);
+ }
+
+ // Deregister a bi-direction file descriptor watcher.
+ //
+ // If other threads may be polling the event loop, it is not safe to assume
+ // the watcher is unregistered until the watchRemoved() callback is issued
+ // (which will not occur until the event handler returns, if it is active).
+ // In a single threaded environment, it is safe to delete the watcher after
+ // calling this method as long as the handler (if it is active) accesses no
+ // internal state and returns Rearm::REMOVED.
+ // TODO: implement REMOVED, or correct above statement.
+ void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
+ {
+ eloop->deregister(this, this->watch_fd);
+ }
+
+ // Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
+ // Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
+};
+
// Posix child process event watcher
template <typename T_Mutex>
class PosixChildWatcher : private dprivate::BaseChildWatcher<T_Mutex>
{
+ using BaseWatcher = dprivate::BaseWatcher;
+
public:
// Reserve resources for a child watcher with the given event loop.
// Reservation can fail with std::bad_alloc.
// Registration can fail with std::bad_alloc.
void registerWith(EventLoop<T_Mutex> *eloop, pid_t child)
{
- this->deleteme = false;
+ BaseWatcher::init();
this->watch_pid = child;
eloop->registerChild(this, child);
}
// Registration cannot fail.
void registerReserved(EventLoop<T_Mutex> *eloop, pid_t child) noexcept
{
+ BaseWatcher::init();
eloop->registerReservedChild(this, child);
}