$(CXX) -o dinit-reboot dinit-reboot.o
$(objects): %.o: %.cc service.h dinit-log.h control.h control-cmds.h cpbuffer.h
- $(CXX) $(CXXOPTS) -Idasync -c $< -o $@
+ $(CXX) $(CXXOPTS) -Idasynq -c $< -o $@
#install: all
char outbuf[] = { DINIT_RP_BADREQ };
if (! queuePacket(outbuf, 1)) return false;
bad_conn_close = true;
- iob.setWatchFlags(out_events);
+ iob.setWatchFlags(OUT_EVENTS);
}
return true;
}
char badreqRep[] = { DINIT_RP_BADREQ };
if (! queuePacket(badreqRep, 1)) return false;
bad_conn_close = true;
- iob.setWatchFlags(out_events);
+ iob.setWatchFlags(OUT_EVENTS);
return true;
}
char badreqRep[] = { DINIT_RP_BADREQ };
if (! queuePacket(badreqRep, 1)) return false;
bad_conn_close = true;
- iob.setWatchFlags(out_events);
+ iob.setWatchFlags(OUT_EVENTS);
return true;
}
else {
char badreqRep[] = { DINIT_RP_BADREQ };
if (! queuePacket(badreqRep, 1)) return false;
bad_conn_close = true;
- iob.setWatchFlags(out_events);
+ iob.setWatchFlags(OUT_EVENTS);
return true;
}
else {
bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
{
- int in_flag = bad_conn_close ? 0 : in_events;
+ int in_flag = bad_conn_close ? 0 : IN_EVENTS;
bool was_empty = outbuf.empty();
// If the queue is empty, we can try to write the packet out now rather than queueing it.
// Create a vector out of the (remaining part of the) packet:
try {
outbuf.emplace_back(pkt, pkt + size);
- iob.setWatchFlags(in_flag | out_events);
+ iob.setWatchFlags(in_flag | OUT_EVENTS);
return true;
}
catch (std::bad_alloc &baexc) {
return false;
}
else {
- iob.setWatchFlags(out_events);
+ iob.setWatchFlags(OUT_EVENTS);
return true;
}
}
// make them extraordinary difficult to combine into a single method.
bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
{
- int in_flag = bad_conn_close ? 0 : in_events;
+ int in_flag = bad_conn_close ? 0 : IN_EVENTS;
bool was_empty = outbuf.empty();
if (was_empty) {
try {
outbuf.emplace_back(pkt);
- iob.setWatchFlags(in_flag | out_events);
+ iob.setWatchFlags(in_flag | OUT_EVENTS);
return true;
}
catch (std::bad_alloc &baexc) {
return false;
}
else {
- iob.setWatchFlags(out_events);
+ iob.setWatchFlags(OUT_EVENTS);
return true;
}
}
// TODO log error?
// TODO error response?
bad_conn_close = true;
- iob.setWatchFlags(out_events);
+ iob.setWatchFlags(OUT_EVENTS);
}
else {
- int out_flags = (bad_conn_close || !outbuf.empty()) ? out_events : 0;
- iob.setWatchFlags(in_events | out_flags);
+ int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
+ iob.setWatchFlags(IN_EVENTS | out_flags);
}
return false;
outpkt_index = 0;
if (outbuf.empty() && ! oom_close) {
if (! bad_conn_close) {
- iob.setWatchFlags(in_events);
+ iob.setWatchFlags(IN_EVENTS);
}
else {
return true;
#include <unistd.h>
-#include "dasync.h"
+#include "dasynq.h"
#include "dinit-log.h"
#include "control-cmds.h"
// Control connection for dinit
-using namespace dasync;
+using namespace dasynq;
using EventLoop_t = EventLoop<NullMutex>;
class ControlConn;
class ControlConnWatcher;
// forward-declaration of callback:
-static dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
+static Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
// Pointer to the control connection that is listening for rollback completion
extern ControlConn * rollback_handler_conn;
class ServiceSet;
class ServiceRecord;
-class ControlConnWatcher : public PosixBidiFdWatcher<NullMutex>
+class ControlConnWatcher : public BidiFdWatcher<NullMutex>
{
inline Rearm receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept;
Rearm readReady(EventLoop_t * loop, int fd) noexcept override
{
- return receiveEvent(loop, fd, in_events);
+ return receiveEvent(loop, fd, IN_EVENTS);
}
Rearm writeReady(EventLoop_t * loop, int fd) noexcept override
{
- return receiveEvent(loop, fd, out_events);
+ return receiveEvent(loop, fd, OUT_EVENTS);
}
public:
void setWatchFlags(int flags)
{
- PosixBidiFdWatcher<NullMutex>::setWatchFlags(eventLoop, flags);
+ BidiFdWatcher<NullMutex>::setWatchFlags(eventLoop, flags);
}
void registerWith(EventLoop_t *loop, int fd, int flags)
{
this->fd = fd;
this->eventLoop = loop;
- PosixBidiFdWatcher<NullMutex>::registerWith(loop, fd, flags);
+ BidiFdWatcher<NullMutex>::registerWith(loop, fd, flags);
}
};
class ControlConn : private ServiceListener
{
- friend dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
+ friend Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
ControlConnWatcher iob;
EventLoop_t *loop;
{
bad_conn_close = true;
oom_close = true;
- iob.setWatchFlags(out_events);
+ iob.setWatchFlags(OUT_EVENTS);
}
// Process service event broadcast.
public:
ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
{
- iob.registerWith(loop, fd, in_events);
+ iob.registerWith(loop, fd, IN_EVENTS);
active_control_conns++;
}
};
-static dasync::Rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
+static Rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
{
- using Rearm = dasync::Rearm;
-
char * cc_addr = (reinterpret_cast<char *>(watcher)) - offsetof(ControlConn, iob);
ControlConn *conn = reinterpret_cast<ControlConn *>(cc_addr);
- if (revents & in_events) {
+ if (revents & IN_EVENTS) {
if (conn->dataReady()) {
delete conn;
return Rearm::REMOVED;
}
}
- if (revents & out_events) {
+ if (revents & OUT_EVENTS) {
if (conn->sendData()) {
delete conn;
return Rearm::REMOVED;
+++ /dev/null
-namespace dasync {
-
-// Map of pid_t to void *, with possibility of reserving entries so that mappings can
-// be later added with no danger of allocator exhaustion (bad_alloc).
-class pid_map
-{
- using pair = std::pair<pid_t, void *>;
- std::unordered_map<pid_t, void *> base_map;
- std::vector<pair> backup_vector;
-
- // Number of entries in backup_vector that are actually in use (as opposed
- // to simply reserved):
- int backup_size = 0;
-
- public:
- using entry = std::pair<bool, void *>;
-
- entry get(pid_t key) noexcept
- {
- auto it = base_map.find(key);
- if (it == base_map.end()) {
- // Not in map; look in vector
- for (int i = 0; i < backup_size; i++) {
- if (backup_vector[i].first == key) {
- return entry(true, backup_vector[i].second);
- }
- }
-
- return entry(false, nullptr);
- }
-
- return entry(true, it->second);
- }
-
- entry erase(pid_t key) noexcept
- {
- auto iter = base_map.find(key);
- if (iter != base_map.end()) {
- entry r(true, iter->second);
- base_map.erase(iter);
- return r;
- }
- for (int i = 0; i < backup_size; i++) {
- if (backup_vector[i].first == key) {
- entry r(true, backup_vector[i].second);
- backup_vector.erase(backup_vector.begin() + i);
- return r;
- }
- }
- return entry(false, nullptr);
- }
-
- // Throws bad_alloc on reservation failure
- void reserve()
- {
- backup_vector.resize(backup_vector.size() + 1);
- }
-
- void add(pid_t key, void *val) // throws std::bad_alloc
- {
- base_map[key] = val;
- }
-
- void add_from_reserve(pid_t key, void *val) noexcept
- {
- try {
- base_map[key] = val;
- backup_vector.resize(backup_vector.size() - 1);
- }
- catch (std::bad_alloc &) {
- // We couldn't add into the map, use the reserve:
- backup_vector[backup_size++] = pair(key, val);
- }
- }
-};
-
-namespace {
- void sigchld_handler(int signum)
- {
- // If SIGCHLD has no handler (is ignored), SIGCHLD signals will
- // not be queued for terminated child processes. (On Linux, the
- // default disposition for SIGCHLD is to be ignored but *not* have
- // this behavior, which seems inconsistent. Setting a handler doesn't
- // hurt in any case).
- }
-}
-
-template <class Base> class ChildProcEvents : public Base
-{
- private:
- pid_map child_waiters;
-
- using SigInfo = typename Base::SigInfo;
-
- protected:
- void receiveSignal(SigInfo &siginfo, void *userdata)
- {
- if (siginfo.get_signo() == SIGCHLD) {
- int status;
- pid_t child;
- while ((child = waitpid(-1, &status, WNOHANG)) > 0) {
- pid_map::entry ent = child_waiters.erase(child);
- if (ent.first) {
- Base::receiveChildStat(child, status, ent.second);
- }
- }
- }
- else {
- Base::receiveSignal(siginfo, userdata);
- }
- }
-
- public:
- void reserveChildWatch()
- {
- child_waiters.reserve();
- }
-
- void addChildWatch(pid_t child, void *val)
- {
- child_waiters.add(child, val);
- }
-
- void addReservedChildWatch(pid_t child, void *val) noexcept
- {
- child_waiters.add_from_reserve(child, val);
- }
-
- template <typename T> void init(T *loop_mech)
- {
- loop_mech->addSignalWatch(SIGCHLD, nullptr);
- struct sigaction chld_action;
- chld_action.sa_handler = sigchld_handler;
- sigemptyset(&chld_action.sa_mask);
- chld_action.sa_flags = 0;
- sigaction(SIGCHLD, &chld_action, nullptr);
- }
-};
-
-
-} // end namespace
+++ /dev/null
-#include <system_error>
-#include <mutex>
-#include <type_traits>
-#include <unordered_map>
-#include <vector>
-
-#include <sys/epoll.h>
-#include <sys/signalfd.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-
-#include <unistd.h>
-#include <signal.h>
-
-namespace dasync {
-
-template <class Base> class EpollLoop;
-
-class EpollTraits
-{
- template <class Base> friend class EpollLoop;
-
- public:
-
- class SigInfo
- {
- template <class Base> friend class EpollLoop;
-
- struct signalfd_siginfo info;
-
- public:
- int get_signo() { return info.ssi_signo; }
- int get_sicode() { return info.ssi_code; }
- int get_siint() { return info.ssi_int; }
- int get_ssiptr() { return info.ssi_ptr; }
- int get_ssiaddr() { return info.ssi_addr; }
-
- void set_signo(int signo) { info.ssi_signo = signo; }
- };
-
- class FD_r;
-
- // File descriptor optional storage. If the mechanism can return the file descriptor, this
- // class will be empty, otherwise it can hold a file descriptor.
- class FD_s {
- friend class FD_r;
-
- // Epoll doesn't return the file descriptor (it can, but it can't return both file
- // descriptor and user data).
- int fd;
- };
-
- // File descriptor reference (passed to event callback). If the mechanism can return the
- // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
- // must be stored in an FD_s instance.
- class FD_r {
- public:
- int getFd(FD_s ss)
- {
- return ss.fd;
- }
- };
-
- const static bool has_bidi_fd_watch = true;
- const static bool has_separate_rw_fd_watches = false;
-};
-
-
-template <class Base> class EpollLoop : public Base
-{
- int epfd; // epoll fd
- int sigfd; // signalfd fd; -1 if not initialised
- sigset_t sigmask;
-
- std::unordered_map<int, void *> sigdataMap;
-
- // Base contains:
- // lock - a lock that can be used to protect internal structure.
- // receive*() methods will be called with lock held.
- // receiveSignal(SigInfo &, user *) noexcept
- // receiveFdEvent(FD_r, user *, int flags) noexcept
-
- using SigInfo = EpollTraits::SigInfo;
- using FD_r = typename EpollTraits::FD_r;
-
- void processEvents(epoll_event *events, int r)
- {
- std::lock_guard<decltype(Base::lock)> guard(Base::lock);
-
- for (int i = 0; i < r; i++) {
- void * ptr = events[i].data.ptr;
-
- if (ptr == &sigfd) {
- // Signal
- SigInfo siginfo;
- while (true) {
- int r = read(sigfd, &siginfo.info, sizeof(siginfo.info));
- if (r == -1) break;
- if (siginfo.get_signo() != SIGCHLD) {
- // TODO remove the special exception for SIGCHLD?
- sigdelset(&sigmask, siginfo.get_signo());
- }
- auto iter = sigdataMap.find(siginfo.get_signo());
- if (iter != sigdataMap.end()) {
- void *userdata = (*iter).second;
- Base::receiveSignal(siginfo, userdata);
- }
- }
- signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
- }
- else {
- int flags = 0;
- (events[i].events & EPOLLIN) && (flags |= in_events);
- (events[i].events & EPOLLHUP) && (flags |= in_events);
- (events[i].events & EPOLLOUT) && (flags |= out_events);
- (events[i].events & EPOLLERR) && (flags |= err_events);
- Base::receiveFdEvent(*this, FD_r(), ptr, flags);
- }
- }
- }
-
- public:
-
- /**
- * EpollLoop constructor.
- *
- * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
- */
- EpollLoop() : sigfd(-1)
- {
- epfd = epoll_create1(EPOLL_CLOEXEC);
- if (epfd == -1) {
- throw std::system_error(errno, std::system_category());
- }
- sigemptyset(&sigmask);
- Base::init(this);
- }
-
- ~EpollLoop()
- {
- close(epfd);
- if (sigfd != -1) {
- close(sigfd);
- }
- }
-
- // flags: in_events | out_events
- void addFdWatch(int fd, void *userdata, int flags)
- {
- struct epoll_event epevent;
- // epevent.data.fd = fd;
- epevent.data.ptr = userdata;
- epevent.events = 0;
-
- if (flags & one_shot) {
- epevent.events = EPOLLONESHOT;
- }
- if (flags & in_events) {
- epevent.events |= EPOLLIN;
- }
- if (flags & out_events) {
- epevent.events |= EPOLLOUT;
- }
-
- if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) {
- throw new std::system_error(errno, std::system_category());
- }
- }
-
- void removeFdWatch(int fd, int flags) noexcept
- {
- epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr);
- }
-
- void removeFdWatch_nolock(int fd, int flags) noexcept
- {
- removeFdWatch(fd, flags);
- }
-
- // Note this will *replace* the old flags with the new, that is,
- // it can enable *or disable* read/write events.
- void enableFdWatch(int fd, void *userdata, int flags) noexcept
- {
- struct epoll_event epevent;
- // epevent.data.fd = fd;
- epevent.data.ptr = userdata;
- epevent.events = 0;
-
- if (flags & one_shot) {
- epevent.events = EPOLLONESHOT;
- }
- if (flags & in_events) {
- epevent.events |= EPOLLIN;
- }
- if (flags & out_events) {
- epevent.events |= EPOLLOUT;
- }
-
- if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) {
- // Shouldn't be able to fail
- // throw new std::system_error(errno, std::system_category());
- }
- }
-
- void enableFdWatch_nolock(int fd, void *userdata, int flags)
- {
- enableFdWatch(fd, userdata, flags);
- }
-
- void disableFdWatch(int fd, int flags) noexcept
- {
- struct epoll_event epevent;
- // epevent.data.fd = fd;
- epevent.data.ptr = nullptr;
- epevent.events = 0;
-
- // Epoll documentation says that hangup will still be reported, need to check
- // whether this is really the case. Suspect it is really only the case if
- // EPOLLIN is set.
- if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) {
- // Let's assume that this can't fail.
- // throw new std::system_error(errno, std::system_category());
- }
- }
-
- void disableFdWatch_nolock(int fd, int flags) noexcept
- {
- disableFdWatch(fd, flags);
- }
-
- // Note signal should be masked before call.
- void addSignalWatch(int signo, void *userdata)
- {
- std::lock_guard<decltype(Base::lock)> guard(Base::lock);
-
- sigdataMap[signo] = userdata;
-
- // Modify the signal fd to watch the new signal
- bool was_no_sigfd = (sigfd == -1);
- sigaddset(&sigmask, signo);
- sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
- if (sigfd == -1) {
- throw new std::system_error(errno, std::system_category());
- }
-
- if (was_no_sigfd) {
- // Add the signalfd to the epoll set.
- struct epoll_event epevent;
- epevent.data.ptr = &sigfd;
- epevent.events = EPOLLIN;
- // No need for EPOLLONESHOT - we can pull the signals out
- // as we see them.
- if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) {
- close(sigfd);
- throw new std::system_error(errno, std::system_category());
- }
- }
- }
-
- // Note, called with lock held:
- void rearmSignalWatch_nolock(int signo) noexcept
- {
- sigaddset(&sigmask, signo);
- signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
- }
-
- void removeSignalWatch_nolock(int signo) noexcept
- {
- sigdelset(&sigmask, signo);
- signalfd(sigfd, &sigmask, 0);
- }
-
- void removeSignalWatch(int signo) noexcept
- {
- std::lock_guard<decltype(Base::lock)> guard(Base::lock);
- removeSignalWatch_nolock(signo);
- }
-
- // If events are pending, process an unspecified number of them.
- // If no events are pending, wait until one event is received and
- // process this event (and possibly any other events received
- // simultaneously).
- // If processing an event removes a watch, there is a possibility
- // that the watched event will still be reported (if it has
- // occurred) before pullEvents() returns.
- //
- // do_wait - if false, returns immediately if no events are
- // pending.
- void pullEvents(bool do_wait)
- {
- epoll_event events[16];
- int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0);
- if (r == -1 || r == 0) {
- // signal or no events
- return;
- }
-
- processEvents(events, r);
- }
-
- // If events are pending, process one of them.
- // If no events are pending, wait until one event is received and
- // process this event.
- //
- // do_wait - if false, returns immediately if no events are
- // pending.
- void pullOneEvent(bool do_wait)
- {
- epoll_event events[1];
- int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0);
- if (r == -1 || r == 0) {
- // signal or no events
- return;
- }
-
- processEvents(events, r);
- }
-
- // Interrupt any current poll operation (pullEvents/pullOneEvent), causing
- // it to to return immediately.
- void interruptWait()
- {
- // TODO
- }
-};
-
-} // end namespace
+++ /dev/null
-namespace dasync {
-
-// Event type bits
-constexpr unsigned int in_events = 1;
-constexpr unsigned int out_events = 2;
-constexpr unsigned int err_events = 4;
-
-constexpr unsigned int one_shot = 8;
-
-// Masks:
-constexpr unsigned int IO_EVENTS = in_events | out_events;
-
-}
+++ /dev/null
-#include <system_error>
-#include <mutex>
-#include <type_traits>
-#include <unordered_map>
-#include <vector>
-
-#ifdef __OpenBSD__
-#include <sys/signal.h> // for __thrsigdivert aka sigtimedwait
-#include <sys/syscall.h>
-extern "C" {
- int __thrsigdivert(sigset_t set, siginfo_t *info, const struct timespec * timeout);
-}
-#endif
-
-#include <sys/event.h>
-#include <sys/time.h>
-#include <sys/types.h>
-#include <sys/wait.h>
-
-#include <unistd.h>
-#include <signal.h>
-
-namespace dasync {
-
-template <class Base> class KqueueLoop;
-
-class KqueueTraits
-{
- template <class Base> friend class KqueueLoop;
-
- public:
-
- class SigInfo
- {
- template <class Base> friend class KqueueLoop;
-
- siginfo_t info;
-
- public:
- int get_signo() { return info.si_signo; }
- int get_sicode() { return info.si_code; }
- char * get_ssiaddr() { return info.si_addr; }
-
- void set_signo(int signo) { info.si_signo = signo; }
- };
-
- class FD_r;
-
- // File descriptor optional storage. If the mechanism can return the file descriptor, this
- // class will be empty, otherwise it can hold a file descriptor.
- class FD_s {
- // Epoll doesn't return the file descriptor (it can, but it can't return both file
- // descriptor and user data).
- // TODO make true empty.
- };
-
- // File descriptor reference (passed to event callback). If the mechanism can return the
- // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
- // must be stored in an FD_s instance.
- class FD_r {
- int fd;
- public:
- int getFd(FD_s ss)
- {
- return fd;
- }
- FD_r(int nfd) : fd(nfd)
- {
- }
- };
-
- const static bool has_separate_rw_fd_watches = true;
-};
-
-#if defined(__OpenBSD__)
-// OpenBSD has no sigtimedwait (or sigwaitinfo) but does have "__thrsigdivert", which is
-// essentially an incomplete version of the same thing. Discussion with OpenBSD developer
-// Ted Unangst suggested that the siginfo_t structure returned might not always have all
-// fields set correctly. Furthermore there is a bug such that specifying a zero timeout (or
-// indeed any timeout less than a tick) results in NO timeout.
-static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct timespec *timeout)
-{
- // We know that we're only called with a timeout of 0 (which doesn't work properly) and
- // that we safely overwrite the timeout. So, we set tv_nsec to an invalid value, which
- // will cause EINVAL to be returned, but will still pick up any pending signals *first*.
- timeout->tv_nsec = 1000000001;
- return __thrsigdivert(*ssp, info, timeout);
-}
-#endif
-
-template <class Base> class KqueueLoop : public Base
-{
- int kqfd; // kqueue fd
- sigset_t sigmask; // enabled signal watch mask
-
- // Map of signal number to user data pointer. If kqueue had been better thought-through,
- // we shouldn't need this. Although we can associate user data with an EVFILT_SIGNAL kqueue
- // filter, the problem is that the kqueue signal report *coexists* with the regular signal
- // delivery mechanism without having any connection to it. Whereas regular signals can be
- // queued (especially "realtime" signals, via sigqueue()), kqueue just maintains a counter
- // of delivery attempts and clears this when we read the event. What this means is that
- // kqueue won't necessarily tell us if signals are pending, in the case that:
- // 1) it already reported the attempted signal delivery and
- // 2) more than one of the same signal was pending at that time and
- // 3) no more deliveries of the same signal have been attempted in the meantime.
- // Of course, if kqueue doesn't report the signal, then it doesn't give us the data associated
- // with the event, so we need to maintain that separately too:
- std::unordered_map<int, void *> sigdataMap;
-
- // Base contains:
- // lock - a lock that can be used to protect internal structure.
- // receive*() methods will be called with lock held.
- // receiveSignal(SigInfo &, user *) noexcept
- // receiveFdEvent(FD_r, user *, int flags) noexcept
-
- using SigInfo = KqueueTraits::SigInfo;
- using FD_r = typename KqueueTraits::FD_r;
-
- void processEvents(struct kevent *events, int r)
- {
- std::lock_guard<decltype(Base::lock)> guard(Base::lock);
-
- for (int i = 0; i < r; i++) {
- if (events[i].filter == EVFILT_SIGNAL) {
- SigInfo siginfo;
- sigset_t sset;
- sigemptyset(&sset);
- sigaddset(&sset, events[i].ident);
- struct timespec timeout;
- timeout.tv_sec = 0;
- timeout.tv_nsec = 0;
- if (sigtimedwait(&sset, &siginfo.info, &timeout) > 0) {
- Base::receiveSignal(siginfo, (void *)events[i].udata);
- }
-
- if (events[i].ident != SIGCHLD) {
- sigdelset(&sigmask, events[i].ident);
- events[i].flags = EV_DISABLE;
- }
- else {
- // TODO can we remove this SIGCHLD hack?
- events[i].flags = EV_ENABLE;
- }
- }
- else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) {
- int flags = events[i].filter == EVFILT_READ ? in_events : out_events;
- Base::receiveFdEvent(*this, FD_r(events[i].ident), events[i].udata, flags);
- events[i].flags = EV_DISABLE | EV_CLEAR;
- // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for
- // another connection).
- }
- else {
- events[i].flags = EV_DISABLE;
- }
- }
-
- // Now we disable all received events, to simulate EV_DISPATCH:
- kevent(kqfd, events, r, nullptr, 0, nullptr);
- }
-
- public:
-
- /**
- * KqueueLoop constructor.
- *
- * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
- */
- KqueueLoop()
- {
- kqfd = kqueue();
- if (kqfd == -1) {
- throw std::system_error(errno, std::system_category());
- }
- sigemptyset(&sigmask);
- Base::init(this);
- }
-
- ~KqueueLoop()
- {
- close(kqfd);
- }
-
- void setFilterEnabled(short filterType, uintptr_t ident, bool enable)
- {
- struct kevent kev;
- EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, 0, 0, 0);
- kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
- }
-
- void removeFilter(short filterType, uintptr_t ident)
- {
- struct kevent kev;
- EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0);
- kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
- }
-
- // flags: in_events | out_events
- void addFdWatch(int fd, void *userdata, int flags)
- {
- // TODO kqueue doesn't support EVFILE_WRITE on file fd's :/
-
- short filter = (flags & in_events) ? EVFILT_READ : EVFILT_WRITE;
-
- struct kevent kev;
- EV_SET(&kev, fd, filter, EV_ADD, 0, 0, userdata);
- if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) {
- throw new std::system_error(errno, std::system_category());
- }
- }
-
- void removeFdWatch(int fd, int flags)
- {
- removeFilter((flags & in_events) ? EVFILT_READ : EVFILT_WRITE, fd);
- }
-
- void removeFdWatch_nolock(int fd, int flags)
- {
- removeFdWatch(fd, flags);
- }
-
- void enableFdWatch(int fd, void *userdata, int flags)
- {
- setFilterEnabled((flags & in_events) ? EVFILT_READ : EVFILT_WRITE, fd, true);
- }
-
- void enableFdWatch_nolock(int fd, void *userdata, int flags)
- {
- enableFdWatch(fd, userdata, flags);
- }
-
- void disableFdWatch(int fd, int flags)
- {
- setFilterEnabled((flags & in_events) ? EVFILT_READ : EVFILT_WRITE, fd, false);
- }
-
- void disableFdWatch_nolock(int fd, int flags)
- {
- disableFdWatch(fd, flags);
- }
-
- // Note signal should be masked before call.
- void addSignalWatch(int signo, void *userdata)
- {
- std::lock_guard<decltype(Base::lock)> guard(Base::lock);
-
- sigdataMap[signo] = userdata;
- sigaddset(&sigmask, signo);
-
- struct kevent evt;
- EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD, 0, 0, userdata);
- // TODO use EV_DISPATCH if available (not on OpenBSD)
-
- if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) {
- throw new std::system_error(errno, std::system_category());
- }
- }
-
- // Note, called with lock held:
- void rearmSignalWatch_nolock(int signo) noexcept
- {
- sigaddset(&sigmask, signo);
-
- struct kevent evt;
- EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ENABLE, 0, 0, 0);
- // TODO use EV_DISPATCH if available (not on OpenBSD)
-
- kevent(kqfd, &evt, 1, nullptr, 0, nullptr);
- }
-
- void removeSignalWatch_nolock(int signo) noexcept
- {
- sigdelset(&sigmask, signo);
-
- struct kevent evt;
- EV_SET(&evt, signo, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0);
-
- kevent(kqfd, &evt, 1, nullptr, 0, nullptr);
- }
-
- void removeSignalWatch(int signo) noexcept
- {
- std::lock_guard<decltype(Base::lock)> guard(Base::lock);
- removeSignalWatch_nolock(signo);
- }
-
- // If events are pending, process an unspecified number of them.
- // If no events are pending, wait until one event is received and
- // process this event (and possibly any other events received
- // simultaneously).
- // If processing an event removes a watch, there is a possibility
- // that the watched event will still be reported (if it has
- // occurred) before pullEvents() returns.
- //
- // do_wait - if false, returns immediately if no events are
- // pending.
- void pullEvents(bool do_wait)
- {
- // We actually need to check pending signals, since
- // kqueue can count signals as they are delivered but the count is
- // cleared when we poll the kqueue, meaning that signals might still
- // be pending if they were queued multiple times at the last poll.
-
- // TODO we should only poll for signals that *have* been reported
- // as being raised more than once prior via kevent, rather than all
- // signals that have been registered - in many cases that will allow
- // us to skip the sigtimedwait call altogether.
-
- {
- std::lock_guard<decltype(Base::lock)> guard(Base::lock);
- struct timespec timeout;
- timeout.tv_sec = 0;
- timeout.tv_nsec = 0;
- SigInfo siginfo;
- int rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout);
- while (rsigno > 0) {
- // TODO avoid this hack for SIGCHLD somehow
- if (rsigno != SIGCHLD) {
- sigdelset(&sigmask, rsigno);
- // TODO accumulate and disable multiple filters with a single kevents call
- // rather than disabling each individually
- setFilterEnabled(EVFILT_SIGNAL, rsigno, false);
- }
- Base::receiveSignal(siginfo, sigdataMap[rsigno]);
- rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout);
- }
- }
-
- struct kevent events[16];
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
- int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
- if (r == -1 || r == 0) {
- // signal or no events
- return;
- }
-
- processEvents(events, r);
- }
-
- // If events are pending, process one of them.
- // If no events are pending, wait until one event is received and
- // process this event.
- //
- // do_wait - if false, returns immediately if no events are
- // pending.
- void pullOneEvent(bool do_wait)
- {
- // TODO must check for pending signals as per pullEvents()
- struct kevent events[1];
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = 0;
- int r = kevent(kqfd, nullptr, 0, events, 1, do_wait ? nullptr : &ts);
- if (r == -1 || r == 0) {
- // signal or no events
- return;
- }
-
- processEvents(events, r);
- }
-
- // Interrupt any current poll operation (pullEvents/pullOneEvent), causing
- // it to to return immediately.
- void interruptWait()
- {
- // TODO
- }
-};
-
-} // end namespace
+++ /dev/null
-#ifndef DASYNC_H_INCLUDED
-#define DASYNC_H_INCLUDED
-
-#if defined(__OpenBSD__)
-#define HAVE_KQUEUE 1
-#endif
-
-#if defined(__linux__)
-#define HAVE_EPOLL 1
-#endif
-
-#include "dasync-flags.h"
-
-#if defined(HAVE_KQUEUE)
-#include "dasync-kqueue.h"
-#include "dasync-childproc.h"
-namespace dasync {
- template <typename T> using Loop = KqueueLoop<T>;
- using LoopTraits = KqueueTraits;
-}
-#elif defined(HAVE_EPOLL)
-#include "dasync-epoll.h"
-#include "dasync-childproc.h"
-namespace dasync {
- template <typename T> using Loop = EpollLoop<T>;
- using LoopTraits = EpollTraits;
-}
-#endif
-#include <atomic>
-#include <condition_variable>
-#include <cstdint>
-#include <cstddef>
-
-#include "dmutex.h"
-
-
-
-// TODO consider using atomic variables instead of explicit locking where appropriate
-
-// Allow optimisation of empty classes by including this in the body:
-// May be included as the last entry for a class which is only
-// _potentially_ empty.
-
-/*
-#ifdef __GNUC__
-#ifdef __clang__
-#define EMPTY_BODY private: char empty_fill[0];
-#else
-#define EMPTY_BODY private: char empty_fill[0];
-#endif
-#else
-#define EMPTY_BODY
-#endif
-*/
-
-namespace dasync {
-
-
-/**
- * Values for rearm/disarm return from event handlers
- */
-enum class Rearm
-{
- /** Re-arm the event watcher so that it receives further events */
- REARM,
- /** Disarm the event watcher so that it receives no further events, until it is re-armed explicitly */
- DISARM,
- /** Leave in current armed/disarmed state */
- NOOP,
- /** Remove the event watcher (and call "removed" callback) */
- REMOVE,
- /** The watcher has been removed - don't touch it! */
- REMOVED
-// TODO: add a REQUEUE option, which means, "I didn't complete input/output, run me again soon"
-};
-
-
-// Forward declarations:
-template <typename T_Mutex> class EventLoop;
-template <typename T_Mutex> class PosixFdWatcher;
-template <typename T_Mutex> class PosixBidiFdWatcher;
-template <typename T_Mutex> class PosixSignalWatcher;
-template <typename T_Mutex> class PosixChildWatcher;
-
-// Information about a received signal.
-// This is essentially a wrapper for the POSIX siginfo_t; its existence allows for mechanisms that receive
-// equivalent signal information in a different format (eg signalfd on Linux).
-using SigInfo = LoopTraits::SigInfo;
-
-namespace dprivate {
- // (non-public API)
-
- enum class WatchType
- {
- SIGNAL,
- FD,
- CHILD,
- SECONDARYFD
- };
-
- template <typename T_Mutex, typename Traits> class EventDispatch;
-
- // For FD watchers:
- // Use this watch flag to indicate that in and out events should be reported separately,
- // that is, watcher should not be disabled until all watched event types are queued.
- constexpr static int multi_watch = 4;
-
- // Represents a queued event notification
- class BaseWatcher
- {
- template <typename T_Mutex, typename Traits> friend class EventDispatch;
- template <typename T_Mutex> friend class dasync::EventLoop;
-
- protected:
- WatchType watchType;
- int active : 1;
- int deleteme : 1;
-
- BaseWatcher * prev;
- BaseWatcher * next;
-
- public:
-
- // Perform initialisation necessary before registration with an event loop
- void init()
- {
- active = false;
- deleteme = false;
- prev = nullptr;
- next = nullptr;
- }
-
- BaseWatcher(WatchType wt) noexcept : watchType(wt) { }
-
- virtual ~BaseWatcher() noexcept { }
-
- // Called when the watcher has been removed.
- // It is guaranteed by the caller that:
- // - the dispatch method is not currently running
- // - the dispatch method will not be called.
- virtual void watchRemoved() noexcept
- {
- // TODO this "delete" behaviour could be dependent on a flag, perhaps?
- // delete this;
- }
- };
-
- // Base signal event - not part of public API
- template <typename T_Mutex>
- class BaseSignalWatcher : public BaseWatcher
- {
- template <typename M, typename Traits> friend class EventDispatch;
- friend class dasync::EventLoop<T_Mutex>;
-
- protected:
- SigInfo siginfo;
- BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { }
-
- public:
- typedef SigInfo &SigInfo_p;
-
- virtual Rearm gotSignal(EventLoop<T_Mutex> * eloop, int signo, SigInfo_p siginfo) = 0;
- };
-
- template <typename T_Mutex>
- class BaseFdWatcher : public BaseWatcher
- {
- template <typename, typename Traits> friend class EventDispatch;
- friend class dasync::EventLoop<T_Mutex>;
-
- protected:
- int watch_fd;
- int watch_flags;
- int event_flags;
-
- BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }
-
- public:
- virtual Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) = 0;
- };
-
- template <typename T_Mutex>
- class BaseBidiFdWatcher : public BaseFdWatcher<T_Mutex>
- {
- template <typename, typename Traits> friend class EventDispatch;
- friend class dasync::EventLoop<T_Mutex>;
-
- // This should never actually get called:
- Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) final
- {
- return Rearm::REARM; // should not be reachable.
- };
-
- protected:
-
- // The main instance is the "input" watcher only; we keep a secondary watcher
- // with a secondary set of flags for the "output" watcher:
- BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD);
-
- int read_removed : 1; // read watch removed?
- int write_removed : 1; // write watch removed?
-
- public:
- virtual Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept = 0;
- virtual Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept = 0;
- };
-
- template <typename T_Mutex>
- class BaseChildWatcher : public BaseWatcher
- {
- template <typename, typename Traits> friend class EventDispatch;
- friend class dasync::EventLoop<T_Mutex>;
-
- protected:
- pid_t watch_pid;
- int child_status;
-
- BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { }
-
- public:
- virtual void gotTermStat(EventLoop<T_Mutex> * eloop, pid_t child, int status) = 0;
- };
-
- // Classes for implementing a fair(ish) wait queue.
- // A queue node can be signalled when it reaches the head of
- // the queue.
-
- template <typename T_Mutex> class waitqueue;
- template <typename T_Mutex> class waitqueue_node;
-
- // Select an appropriate conditiona variable type for a mutex:
- // condition_variable if mutex is std::mutex, or condition_variable_any
- // otherwise.
- template <class T_Mutex> class condvarSelector;
-
- template <> class condvarSelector<std::mutex>
- {
- public:
- typedef std::condition_variable condvar;
- };
-
- template <class T_Mutex> class condvarSelector
- {
- public:
- typedef std::condition_variable_any condvar;
- };
-
- template <> class waitqueue_node<NullMutex>
- {
- // Specialised waitqueue_node for NullMutex.
- // TODO can this be reduced to 0 data members?
- friend class waitqueue<NullMutex>;
- waitqueue_node * next = nullptr;
-
- public:
- void wait(std::unique_lock<NullMutex> &ul) { }
- void signal() { }
- };
-
- template <typename T_Mutex> class waitqueue_node
- {
- typename condvarSelector<T_Mutex>::condvar condvar;
- friend class waitqueue<T_Mutex>;
- waitqueue_node * next = nullptr;
-
- public:
- void signal()
- {
- condvar.notify_one();
- }
-
- void wait(std::unique_lock<T_Mutex> &mutex_lock)
- {
- condvar.wait(mutex_lock);
- }
- };
-
- template <typename T_Mutex> class waitqueue
- {
- waitqueue_node<T_Mutex> * tail = nullptr;
- waitqueue_node<T_Mutex> * head = nullptr;
-
- public:
- waitqueue_node<T_Mutex> * unqueue()
- {
- head = head->next;
- return head;
- }
-
- waitqueue_node<T_Mutex> * getHead()
- {
- return head;
- }
-
- void queue(waitqueue_node<T_Mutex> *node)
- {
- if (tail) {
- tail->next = node;
- }
- else {
- head = node;
- }
- }
- };
-
- // This class serves as the base class (mixin) for the AEN mechanism class.
- // Note that EventDispatch, here, and EventLoop (below) are really two sides of one coin;
- // they do not work independently. The mixin pattern that we use to avoid dynamic dispatch
- // forces them to be two seperate classes, however.
- //
- // The EventDispatch class maintains the queued event data structures. It inserts watchers
- // into the queue when eventes are received (receiveXXX methods).
- template <typename T_Mutex, typename Traits> class EventDispatch : public Traits
- {
- friend class EventLoop<T_Mutex>;
-
- // queue data structure/pointer
- BaseWatcher * first;
-
- using BaseSignalWatcher = dasync::dprivate::BaseSignalWatcher<T_Mutex>;
- using BaseFdWatcher = dasync::dprivate::BaseFdWatcher<T_Mutex>;
- using BaseBidiFdWatcher = dasync::dprivate::BaseBidiFdWatcher<T_Mutex>;
- using BaseChildWatcher = dasync::dprivate::BaseChildWatcher<T_Mutex>;
-
- void queueWatcher(BaseWatcher *bwatcher)
- {
- // Put in queue:
- if (first == nullptr) {
- bwatcher->prev = bwatcher;
- bwatcher->next = bwatcher;
- first = bwatcher;
- }
- else {
- first->prev->next = bwatcher;
- bwatcher->prev = first->prev;
- first->prev = bwatcher;
- bwatcher->next = first;
- }
- }
-
- bool isQueued(BaseWatcher *bwatcher)
- {
- return bwatcher->prev != nullptr;
- }
-
- void dequeueWatcher(BaseWatcher *bwatcher)
- {
- if (bwatcher->prev == bwatcher) {
- // Only item in queue
- first = nullptr;
- }
- else {
- if (first == bwatcher) first = first->next;
- bwatcher->prev->next = bwatcher->next;
- bwatcher->next->prev = bwatcher->prev;
- }
-
- bwatcher->prev = nullptr;
- bwatcher->next = nullptr;
- }
-
- protected:
- T_Mutex lock;
-
- void receiveSignal(typename Traits::SigInfo & siginfo, void * userdata)
- {
- BaseSignalWatcher * bwatcher = static_cast<BaseSignalWatcher *>(userdata);
- bwatcher->siginfo = siginfo;
- queueWatcher(bwatcher);
- }
-
- template <typename T>
- void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags)
- {
- BaseFdWatcher * bfdw = static_cast<BaseFdWatcher *>(userdata);
-
- bfdw->event_flags |= flags;
-
- BaseWatcher * bwatcher = bfdw;
-
- bool is_multi_watch = bfdw->watch_flags & multi_watch;
- if (is_multi_watch) {
- BaseBidiFdWatcher *bbdw = static_cast<BaseBidiFdWatcher *>(bwatcher);
- if (flags & in_events && flags & out_events) {
- // Queue the secondary watcher first:
- queueWatcher(&bbdw->outWatcher);
- }
- else if (flags & out_events) {
- // Use the secondary watcher for queueing:
- bwatcher = &(bbdw->outWatcher);
- }
- }
-
- queueWatcher(bwatcher);
-
- if (! LoopTraits::has_separate_rw_fd_watches) {
- // If this is a bidirectional fd-watch, it has been disabled in *both* directions
- // as the event was delivered. However, the other direction should not be disabled
- // yet, so we need to re-enable:
- int in_out_mask = in_events | out_events;
- if (is_multi_watch && bfdw->event_flags != (bfdw->watch_flags & in_out_mask)) {
- // We need to re-enable the other channel now:
- loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata,
- (bfdw->watch_flags & ~(bfdw->event_flags)) | one_shot);
- }
- }
- }
-
- void receiveChildStat(pid_t child, int status, void * userdata)
- {
- BaseChildWatcher * watcher = static_cast<BaseChildWatcher *>(userdata);
- watcher->child_status = status;
- queueWatcher(watcher);
- }
-
- // Pull a single event from the queue
- BaseWatcher * pullEvent()
- {
- BaseWatcher * r = first;
- if (r != nullptr) {
- dequeueWatcher(r);
- }
- return r;
- }
-
- void issueDelete(BaseWatcher *watcher) noexcept
- {
- // This is only called when the attention lock is held, so if the watcher is not
- // active/queued now, it cannot become active (and will not be reported with an event)
- // during execution of this function.
-
- lock.lock();
-
- // TODO this needs to handle multi-watch (BidiFdWatcher) properly
-
- if (watcher->active) {
- // If the watcher is active, set deleteme true; the watcher will be removed
- // at the end of current processing (i.e. when active is set false).
- watcher->deleteme = true;
- lock.unlock();
- }
- else {
- // Actually do the delete.
- if (isQueued(watcher)) {
- dequeueWatcher(watcher);
- }
-
- lock.unlock();
- watcher->watchRemoved();
- }
- }
-
- void issueDelete(BaseBidiFdWatcher *watcher) noexcept
- {
- lock.lock();
-
- if (watcher->active) {
- watcher->deleteme = true;
- }
- else {
- if (isQueued(watcher)) {
- dequeueWatcher(watcher);
- }
-
- watcher->read_removed = true;
- }
-
- BaseWatcher *secondary = &(watcher->outWatcher);
- if (secondary->active) {
- secondary->deleteme = true;
- }
- else {
- if (isQueued(secondary)) {
- dequeueWatcher(secondary);
- }
-
- watcher->write_removed = true;
- }
-
- if (watcher->read_removed && watcher->write_removed) {
- lock.unlock();
- watcher->watchRemoved();
- }
- else {
- lock.unlock();
- }
- }
- };
-}
-
-
-template <typename T_Mutex> class EventLoop
-{
- friend class PosixFdWatcher<T_Mutex>;
- friend class PosixBidiFdWatcher<T_Mutex>;
- friend class PosixSignalWatcher<T_Mutex>;
- friend class PosixChildWatcher<T_Mutex>;
-
- template <typename T, typename U> using EventDispatch = dprivate::EventDispatch<T,U>;
- template <typename T> using waitqueue = dprivate::waitqueue<T>;
- template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
- using BaseWatcher = dprivate::BaseWatcher;
- using BaseSignalWatcher = dprivate::BaseSignalWatcher<T_Mutex>;
- using BaseFdWatcher = dprivate::BaseFdWatcher<T_Mutex>;
- using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher<T_Mutex>;
- using BaseChildWatcher = dprivate::BaseChildWatcher<T_Mutex>;
- using WatchType = dprivate::WatchType;
-
- Loop<ChildProcEvents<EventDispatch<T_Mutex, LoopTraits>>> loop_mech;
-
- // There is a complex problem with most asynchronous event notification mechanisms
- // when used in a multi-threaded environment. Generally, a file descriptor or other
- // event type that we are watching will be associated with some data used to manage
- // that event source. For example a web server needs to maintain information about
- // each client connection, such as the state of the connection (what protocol version
- // has been negotiated, etc; if a transfer is taking place, what file is being
- // transferred etc).
- //
- // However, sometimes we want to remove an event source (eg webserver wants to drop
- // a connection) and delete the associated data. The problem here is that it is
- // difficult to be sure when it is ok to actually remove the data, since when
- // requesting to unwatch the source in one thread it is still possible that an
- // event from that source is just being reported to another thread (in which case
- // the data will be needed).
- //
- // To solve that, we:
- // - allow only one thread to poll for events at a time, using a lock
- // - use the same lock to prevent polling, if we want to unwatch an event source
- // - generate an event to interrupt any polling that may already be occurring in
- // another thread
- // - mark handlers as active if they are currently executing, and
- // - when removing an active handler, simply set a flag which causes it to be
- // removed once the current processing is finished, rather than removing it
- // immediately.
- //
- // In particular the lock mechanism for preventing multiple threads polling and
- // for allowing polling to be interrupted is tricky. We can't use a simple mutex
- // since there is significant chance that it will be highly contended and there
- // are no guarantees that its acquisition will be fair. In particular, we don't
- // want a thread that is trying to unwatch a source being starved while another
- // thread polls the event source.
- //
- // So, we use two wait queues protected by a single mutex. The "attn_waitqueue"
- // (attention queue) is the high-priority queue, used for threads wanting to
- // unwatch event sources. The "wait_waitquueue" is the queue used by threads
- // that wish to actually poll for events.
- // - The head of the "attn_waitqueue" is always the holder of the lock
- // - Therefore, a poll-waiter must be moved from the wait_waitqueue to the
- // attn_waitqueue to actually gain the lock. This is only done if the
- // attn_waitqueue is otherwise empty.
- // - The mutex only protects manipulation of the wait queues, and so should not
- // be highly contended.
-
- T_Mutex wait_lock; // wait lock, used to prevent multiple threads from waiting
- // on the event queue simultaneously.
- waitqueue<T_Mutex> attn_waitqueue;
- waitqueue<T_Mutex> wait_waitqueue;
-
- T_Mutex &getBaseLock()
- {
- return loop_mech.lock;
- }
-
- void registerSignal(BaseSignalWatcher *callBack, int signo)
- {
- loop_mech.addSignalWatch(signo, callBack);
- }
-
- void deregister(BaseSignalWatcher *callBack, int signo) noexcept
- {
- loop_mech.removeSignalWatch(signo);
-
- waitqueue_node<T_Mutex> qnode;
- getAttnLock(qnode);
-
- EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
- ed.issueDelete(callBack);
-
- releaseLock(qnode);
- }
-
- void registerFd(BaseFdWatcher *callback, int fd, int eventmask)
- {
- loop_mech.addFdWatch(fd, callback, eventmask | one_shot);
- }
-
- void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask)
- {
- if (LoopTraits::has_separate_rw_fd_watches) {
- // TODO
- }
- else {
- loop_mech.addFdWatch(fd, callback, eventmask | one_shot);
- }
- }
-
- void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled)
- {
- if (enabled) {
- loop_mech.enableFdWatch(fd, watcher, watch_flags | one_shot);
- }
- else {
- loop_mech.disableFdWatch(fd, watch_flags);
- }
- }
-
- void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled)
- {
- if (enabled) {
- loop_mech.enableFdWatch_nolock(fd, watcher, watch_flags | one_shot);
- }
- else {
- loop_mech.disableFdWatch_nolock(fd, watch_flags);
- }
- }
-
- void deregister(BaseFdWatcher *callback, int fd)
- {
- loop_mech.removeFdWatch(fd, callback->watch_flags);
-
- waitqueue_node<T_Mutex> qnode;
- getAttnLock(qnode);
-
- EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
- ed.issueDelete(callback);
-
- releaseLock(qnode);
- }
-
- void deregister(BaseBidiFdWatcher *callback, int fd)
- {
- if (LoopTraits::has_separate_rw_fd_watches) {
- // TODO
- }
- else {
- loop_mech.removeFdWatch(fd, callback->watch_flags);
- }
-
- waitqueue_node<T_Mutex> qnode;
- getAttnLock(qnode);
-
- EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
- ed.issueDelete(callback);
-
- releaseLock(qnode);
- }
-
- void reserveChildWatch(BaseChildWatcher *callBack)
- {
- loop_mech.reserveChildWatch();
- }
-
- void registerChild(BaseChildWatcher *callBack, pid_t child)
- {
- loop_mech.addChildWatch(child, callBack);
- }
-
- void registerReservedChild(BaseChildWatcher *callBack, pid_t child) noexcept
- {
- loop_mech.addReservedChildWatch(child, callBack);
- }
-
- void dequeueWatcher(BaseWatcher *watcher) noexcept
- {
- if (loop_mech.isQueued(watcher)) {
- loop_mech.dequeueWatcher(watcher);
- }
- }
-
- // Acquire the attention lock (when held, ensures that no thread is polling the AEN
- // mechanism).
- void getAttnLock(waitqueue_node<T_Mutex> &qnode)
- {
- std::unique_lock<T_Mutex> ulock(wait_lock);
- attn_waitqueue.queue(&qnode);
- if (attn_waitqueue.getHead() != &qnode) {
- loop_mech.interruptWait();
- while (attn_waitqueue.getHead() != &qnode) {
- qnode.wait(ulock);
- }
- }
- }
-
- // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower
- // priority than the attention lock).
- void getPollwaitLock(waitqueue_node<T_Mutex> &qnode)
- {
- std::unique_lock<T_Mutex> ulock(wait_lock);
- if (attn_waitqueue.getHead() == nullptr) {
- // Queue is completely empty:
- attn_waitqueue.queue(&qnode);
- }
- else {
- wait_waitqueue.queue(&qnode);
- }
-
- while (attn_waitqueue.getHead() != &qnode) {
- qnode.wait(ulock);
- }
- }
-
- // Release the poll-wait/attention lock.
- void releaseLock(waitqueue_node<T_Mutex> &qnode)
- {
- std::unique_lock<T_Mutex> ulock(wait_lock);
- waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
- if (nhead != nullptr) {
- nhead->signal();
- }
- else {
- nhead = wait_waitqueue.getHead();
- if (nhead != nullptr) {
- attn_waitqueue.queue(nhead);
- nhead->signal();
- }
- }
- }
-
- void processSignalRearm(BaseSignalWatcher * bsw, Rearm rearmType)
- {
- // Called with lock held
- if (rearmType == Rearm::REARM) {
- loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo());
- }
- else if (rearmType == Rearm::REMOVE) {
- loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo());
- }
- }
-
- Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch)
- {
- // Called with lock held
- if (is_multi_watch) {
- BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
-
- if (rearmType == Rearm::REMOVE) {
- bdfw->read_removed = 1;
- bdfw->watch_flags &= ~in_events;
-
- if (! LoopTraits::has_separate_rw_fd_watches) {
- if (! bdfw->write_removed) {
- return Rearm::NOOP;
- }
- else {
- // both removed: actually remove
- loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
- return Rearm::REMOVE;
- }
- }
- else {
- // TODO this will need flags for such a loop, since it can't
- // otherwise distinguish which channel watch to remove
- loop_mech.removeFdWatch_nolock(bdfw->watch_fd, bdfw->watch_flags);
- }
- }
- else if (rearmType == Rearm::DISARM) {
- // Nothing more to do
- }
- else if (rearmType == Rearm::REARM) {
- bdfw->watch_flags |= in_events;
- if (! LoopTraits::has_separate_rw_fd_watches) {
- loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
- static_cast<BaseWatcher *>(bdfw),
- (bdfw->watch_flags & (in_events | out_events)) | one_shot);
- }
- else {
- loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
- static_cast<BaseWatcher *>(bdfw),
- in_events | one_shot);
- }
- }
- return rearmType;
- }
- else {
- if (rearmType == Rearm::REARM) {
- loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw,
- (bfw->watch_flags & (in_events | out_events)) | one_shot);
- }
- else if (rearmType == Rearm::REMOVE) {
- loop_mech.removeFdWatch_nolock(bfw->watch_fd, bfw->watch_flags);
- }
- return rearmType;
- }
- }
-
- Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType)
- {
- // Called with lock held
- if (rearmType == Rearm::REMOVE) {
- bdfw->write_removed = 1;
- bdfw->watch_flags &= ~out_events;
-
- if (LoopTraits::has_separate_rw_fd_watches) {
- // TODO this will need flags for such a loop, since it can't
- // otherwise distinguish which channel watch to remove
- loop_mech.removeFdWatch_nolock(bdfw->watch_fd, bdfw->watch_flags);
- return bdfw->read_removed ? Rearm::REMOVE : Rearm::NOOP;
- }
- else {
- if (! bdfw->read_removed) {
- return Rearm::NOOP;
- }
- else {
- // both removed: actually remove
- loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
- return Rearm::REMOVE;
- }
- }
- }
- else if (rearmType == Rearm::DISARM) {
- // Nothing more to do
- }
- else if (rearmType == Rearm::REARM) {
- bdfw->watch_flags |= out_events;
- if (! LoopTraits::has_separate_rw_fd_watches) {
- loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
- static_cast<BaseWatcher *>(bdfw),
- (bdfw->watch_flags & (in_events | out_events)) | one_shot);
- }
- else {
- loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
- static_cast<BaseWatcher *>(bdfw),
- out_events | one_shot);
- }
- }
- return rearmType;
- }
-
- bool processEvents() noexcept
- {
- EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
- ed.lock.lock();
-
- // So this pulls *all* currently pending events and processes them in the current thread.
- // That's probably good for throughput, but maybe the behavior should be configurable.
-
- BaseWatcher * pqueue = ed.pullEvent();
- bool active = false;
-
- while (pqueue != nullptr) {
-
- pqueue->active = true;
- active = true;
-
- Rearm rearmType = Rearm::NOOP;
- bool is_multi_watch = false;
- BaseBidiFdWatcher *bbfw = nullptr;
-
- // (Above variables are initialised only to silence compiler warnings).
-
- // Read/manipulate watch_flags (if necessary) *before* we release the lock:
- if (pqueue->watchType == WatchType::FD) {
- BaseFdWatcher *bfw = static_cast<BaseFdWatcher *>(pqueue);
- bbfw = static_cast<BaseBidiFdWatcher *>(bfw);
- is_multi_watch = bfw->watch_flags & dprivate::multi_watch;
- if (! LoopTraits::has_separate_rw_fd_watches && is_multi_watch) {
- // Clear the input watch flags to avoid enabling read watcher while active:
- bfw->watch_flags &= ~in_events;
- }
- }
- else if (pqueue->watchType == WatchType::SECONDARYFD) {
- is_multi_watch = true;
- char * rp = (char *)pqueue;
- rp -= offsetof(BaseBidiFdWatcher, outWatcher);
- bbfw = (BaseBidiFdWatcher *)rp;
- if (! LoopTraits::has_separate_rw_fd_watches) {
- bbfw->watch_flags &= ~out_events;
- }
- }
-
- ed.lock.unlock();
-
- // Note that we select actions based on the type of the watch, as determined by the watchType
- // member. In some ways this screams out for polmorphism; a virtual function could be overridden
- // by each of the watcher types. I've instead used switch/case because I think it will perform
- // slightly better without the overhead of a virtual function dispatch, but it's got to be a
- // close call; I might be guilty of premature optimisation here.
-
- switch (pqueue->watchType) {
- case WatchType::SIGNAL: {
- BaseSignalWatcher *bsw = static_cast<BaseSignalWatcher *>(pqueue);
- rearmType = bsw->gotSignal(this, bsw->siginfo.get_signo(), bsw->siginfo);
- break;
- }
- case WatchType::FD: {
- BaseFdWatcher *bfw = static_cast<BaseFdWatcher *>(pqueue);
- if (is_multi_watch) {
- // The primary watcher for a multi-watch watcher is queued for
- // read events.
- rearmType = bbfw->readReady(this, bfw->watch_fd);
- }
- else {
- rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags);
- }
- break;
- }
- case WatchType::CHILD: {
- BaseChildWatcher *bcw = static_cast<BaseChildWatcher *>(pqueue);
- bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status);
- // Child watches automatically remove:
- // TODO what if they want to return REMOVED...
- rearmType = Rearm::REMOVE;
- break;
- }
- case WatchType::SECONDARYFD: {
- // first construct a pointer to the main watcher:
- rearmType = bbfw->writeReady(this, bbfw->watch_fd);
- break;
- }
- default: ;
- }
-
- ed.lock.lock();
-
- // (if REMOVED, we must not touch pqueue at all)
- if (rearmType != Rearm::REMOVED) {
-
- pqueue->active = false;
- if (pqueue->deleteme) {
- // We don't want a watch that is marked "deleteme" to re-arm itself.
- rearmType = Rearm::REMOVE;
- }
- switch (pqueue->watchType) {
- case WatchType::SIGNAL:
- processSignalRearm(static_cast<BaseSignalWatcher *>(pqueue), rearmType);
- break;
- case WatchType::FD:
- rearmType = processFdRearm(static_cast<BaseFdWatcher *>(pqueue), rearmType, is_multi_watch);
- break;
- case WatchType::SECONDARYFD:
- rearmType = processSecondaryRearm(bbfw, rearmType);
- break;
- default: ;
- }
-
- if (pqueue->deleteme || rearmType == Rearm::REMOVE) {
- ed.lock.unlock();
- (is_multi_watch ? bbfw : pqueue)->watchRemoved();
- ed.lock.lock();
- }
- }
-
- pqueue = ed.pullEvent();
- }
-
- ed.lock.unlock();
- return active;
- }
-
-
- public:
- void run() noexcept
- {
- while (! processEvents()) {
- waitqueue_node<T_Mutex> qnode;
-
- // We only allow one thread to poll the mechanism at any time, since otherwise
- // removing event watchers is a nightmare beyond comprehension.
- getPollwaitLock(qnode);
-
- // Pull events from the AEN mechanism and insert them in our internal queue:
- loop_mech.pullEvents(true);
-
- // Now release the wait lock:
- releaseLock(qnode);
- }
- }
-};
-
-
-typedef EventLoop<NullMutex> NEventLoop;
-typedef EventLoop<std::mutex> TEventLoop;
-
-// from dasync.cc:
-TEventLoop & getSystemLoop();
-
-// Posix signal event watcher
-template <typename T_Mutex>
-class PosixSignalWatcher : private dprivate::BaseSignalWatcher<T_Mutex>
-{
- using BaseWatcher = dprivate::BaseWatcher;
-
-public:
- using SigInfo_p = typename dprivate::BaseSignalWatcher<T_Mutex>::SigInfo_p;
-
- // Register this watcher to watch the specified signal.
- // If an attempt is made to register with more than one event loop at
- // a time, behaviour is undefined.
- inline void registerWatch(EventLoop<T_Mutex> *eloop, int signo)
- {
- BaseWatcher::init();
- this->siginfo.set_signo(signo);
- eloop->registerSignal(this, signo);
- }
-
- inline void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
- {
- eloop->deregister(this, this->siginfo.get_signo());
- }
-
- // virtual Rearm gotSignal(EventLoop<T_Mutex> *, int signo, SigInfo_p info) = 0;
-};
-
-// Posix file descriptor event watcher
-template <typename T_Mutex>
-class PosixFdWatcher : private dprivate::BaseFdWatcher<T_Mutex>
-{
- using BaseWatcher = dprivate::BaseWatcher;
-
- protected:
-
- // Set the types of event to watch. Only supported if LoopTraits::has_bidi_fd_watch
- // is true; otherwise has unspecified behavior.
- // Only safe to call from within the callback handler (gotEvent). Might not take
- // effect until the current callback handler returns with REARM.
- void setWatchFlags(int newFlags)
- {
- this->watch_flags = newFlags;
- }
-
- public:
-
- // Register a file descriptor watcher with an event loop. Flags
- // can be any combination of dasync::in_events / dasync::out_events.
- // Exactly one of in_events/out_events must be specified if the event
- // loop does not support bi-directional fd watchers.
- //
- // Mechanisms supporting dual watchers allow for two watchers for a
- // single file descriptor (one watching read status and the other
- // write status). Others mechanisms support only a single watcher
- // per file descriptor. Adding a watcher beyond what is supported
- // causes undefined behavior.
- //
- // Can fail with std::bad_alloc or std::system_error.
- void registerWith(EventLoop<T_Mutex> *eloop, int fd, int flags)
- {
- BaseWatcher::init();
- this->watch_fd = fd;
- this->watch_flags = flags;
- eloop->registerFd(this, fd, flags);
- }
-
- // Deregister a file descriptor watcher.
- //
- // If other threads may be polling the event loop, it is not safe to assume
- // the watcher is unregistered until the watchRemoved() callback is issued
- // (which will not occur until the event handler returns, if it is active).
- // In a single threaded environment, it is safe to delete the watcher after
- // calling this method as long as the handler (if it is active) accesses no
- // internal state and returns Rearm::REMOVED.
- // TODO: implement REMOVED, or correct above statement.
- void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
- {
- eloop->deregister(this, this->watch_fd);
- }
-
- void setEnabled(EventLoop<T_Mutex> *eloop, bool enable) noexcept
- {
- std::lock_guard<T_Mutex> guard(eloop->getBaseLock());
- eloop->setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable);
- if (! enable) {
- eloop->dequeueWatcher(this);
- }
- }
-
- // virtual Rearm gotEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
-};
-
-// A Bi-directional file descriptor watcher with independent read- and write- channels.
-// This watcher type has two event notification methods which can both potentially be
-// active at the same time.
-template <typename T_Mutex>
-class PosixBidiFdWatcher : private dprivate::BaseBidiFdWatcher<T_Mutex>
-{
- using BaseWatcher = dprivate::BaseWatcher;
-
- void setWatchEnabled(EventLoop<T_Mutex> *eloop, bool in, bool b)
- {
- int events = in ? in_events : out_events;
-
- if (b) {
- this->watch_flags |= events;
- }
- else {
- this->watch_flags &= ~events;
- }
- if (LoopTraits::has_separate_rw_fd_watches) {
- dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
- eloop->setFdEnabled_nolock(watcher, this->watch_fd, events | one_shot, b);
- if (! b) {
- eloop->dequeueWatcher(watcher);
- }
- }
- else {
- eloop->setFdEnabled_nolock(this, this->watch_fd,
- (this->watch_flags & (in_events | out_events)) | one_shot,
- (this->watch_flags & (in_events | out_events)) != 0);
- if (! b) {
- dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
- eloop->dequeueWatcher(watcher);
- }
- }
- }
-
- protected:
-
- // TODO if a watch is disabled and currently queued, we should de-queue it.
-
- void setInWatchEnabled(EventLoop<T_Mutex> *eloop, bool b) noexcept
- {
- eloop->getBaseLock().lock();
- setWatchEnabled(eloop, true, b);
- eloop->getBaseLock().unlock();
- }
-
- void setOutWatchEnabled(EventLoop<T_Mutex> *eloop, bool b) noexcept
- {
- eloop->getBaseLock().lock();
- setWatchEnabled(eloop, false, b);
- eloop->getBaseLock().unlock();
- }
-
- // Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly.
- //
- // Concurrency: this method can only be called if
- // - it does not enable a watcher that might currently be active
- /// - unless the event loop will not be polled while the watcher is active.
- // (i.e. it is ok to call setWatchFlags from within the readReady/writeReady handlers if no other
- // thread will poll the event loop; it is ok to *dis*able a watcher that might be active).
- void setWatchFlags(EventLoop<T_Mutex> * eloop, int newFlags)
- {
- std::lock_guard<T_Mutex> guard(eloop->getBaseLock());
- if (LoopTraits::has_separate_rw_fd_watches) {
- setWatchEnabled(eloop, true, (newFlags & in_events) != 0);
- setWatchEnabled(eloop, false, (newFlags & out_events) != 0);
- }
- else {
- this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags;
- eloop->setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
- }
- }
-
- public:
-
- // Register a bi-direction file descriptor watcher with an event loop. Flags
- // can be any combination of dasync::in_events / dasync::out_events.
- //
- // Can fail with std::bad_alloc or std::system_error.
- void registerWith(EventLoop<T_Mutex> *eloop, int fd, int flags)
- {
- BaseWatcher::init();
- this->outWatcher.BaseWatcher::init();
- this->watch_fd = fd;
- this->watch_flags = flags | dprivate::multi_watch;
- eloop->registerFd(this, fd, flags);
- }
-
- // Deregister a bi-direction file descriptor watcher.
- //
- // If other threads may be polling the event loop, it is not safe to assume
- // the watcher is unregistered until the watchRemoved() callback is issued
- // (which will not occur until the event handler returns, if it is active).
- // In a single threaded environment, it is safe to delete the watcher after
- // calling this method as long as the handler (if it is active) accesses no
- // internal state and returns Rearm::REMOVED.
- // TODO: implement REMOVED, or correct above statement.
- void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
- {
- eloop->deregister(this, this->watch_fd);
- }
-
- // Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
- // Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
-};
-
-// Posix child process event watcher
-template <typename T_Mutex>
-class PosixChildWatcher : private dprivate::BaseChildWatcher<T_Mutex>
-{
- using BaseWatcher = dprivate::BaseWatcher;
-
- public:
- // Reserve resources for a child watcher with the given event loop.
- // Reservation can fail with std::bad_alloc.
- void reserveWith(EventLoop<T_Mutex> *eloop)
- {
- eloop->reserveChildWatch();
- }
-
- // Register a watcher for the given child process with an event loop.
- // Registration can fail with std::bad_alloc.
- void registerWith(EventLoop<T_Mutex> *eloop, pid_t child)
- {
- BaseWatcher::init();
- this->watch_pid = child;
- eloop->registerChild(this, child);
- }
-
- // Register a watcher for the given child process with an event loop,
- // after having reserved resources previously (using reserveWith).
- // Registration cannot fail.
- void registerReserved(EventLoop<T_Mutex> *eloop, pid_t child) noexcept
- {
- BaseWatcher::init();
- eloop->registerReservedChild(this, child);
- }
-
- // virtual void gotTermStat(EventLoop<T_Mutex> *, pid_t child, int status) = 0;
-};
-
-} // namespace dasync
-
-#endif
+++ /dev/null
-#ifndef D_MUTEX_H_INCLUDED
-#define D_MUTEX_H_INCLUDED
-
-//#include <pthread.h>
-#include <mutex>
-
-namespace dasync {
-
-// Simple non-recursive mutex, with priority inheritance to avoid priority inversion.
-/*
-class DMutex
-{
- private:
- pthread_mutex_t mutex;
-
- public:
- DMutex()
- {
- // Avoid priority inversion by using PTHREAD_PRIO_INHERIT
- pthread_mutexattr_t attribs;
- pthread_mutexattr_init(&attribs);
- pthread_mutexattr_setprotocol(&attribs, PTHREAD_PRIO_INHERIT);
- pthread_mutex_init(&mutex, &attribs);
- }
-
- void lock()
- {
- pthread_mutex_lock(&mutex);
- }
-
- void unlock()
- {
- pthread_mutex_unlock(&mutex);
- }
-};
-*/
-
-using DMutex = std::mutex;
-
-// A "null" mutex, for which locking / unlocking actually does nothing.
-class NullMutex
-{
- #ifdef __GNUC__
- #ifndef __clang__
- char empty[0]; // Make class instances take up no space (gcc)
- #else
- char empty[0] __attribute__((unused)); // Make class instances take up no space (clang)
- #endif
- #endif
-
- public:
- void lock() { }
- void unlock() { }
- void try_lock() { }
-};
-
-
-} // end of namespace
-
-
-#endif
--- /dev/null
+namespace dasynq {
+
+// Map of pid_t to void *, with possibility of reserving entries so that mappings can
+// be later added with no danger of allocator exhaustion (bad_alloc).
+class pid_map
+{
+ using pair = std::pair<pid_t, void *>;
+ std::unordered_map<pid_t, void *> base_map;
+ std::vector<pair> backup_vector;
+
+ // Number of entries in backup_vector that are actually in use (as opposed
+ // to simply reserved):
+ int backup_size = 0;
+
+ public:
+ using entry = std::pair<bool, void *>;
+
+ entry get(pid_t key) noexcept
+ {
+ auto it = base_map.find(key);
+ if (it == base_map.end()) {
+ // Not in map; look in vector
+ for (int i = 0; i < backup_size; i++) {
+ if (backup_vector[i].first == key) {
+ return entry(true, backup_vector[i].second);
+ }
+ }
+
+ return entry(false, nullptr);
+ }
+
+ return entry(true, it->second);
+ }
+
+ entry erase(pid_t key) noexcept
+ {
+ auto iter = base_map.find(key);
+ if (iter != base_map.end()) {
+ entry r(true, iter->second);
+ base_map.erase(iter);
+ return r;
+ }
+ for (int i = 0; i < backup_size; i++) {
+ if (backup_vector[i].first == key) {
+ entry r(true, backup_vector[i].second);
+ backup_vector.erase(backup_vector.begin() + i);
+ return r;
+ }
+ }
+ return entry(false, nullptr);
+ }
+
+ // Throws bad_alloc on reservation failure
+ void reserve()
+ {
+ backup_vector.resize(backup_vector.size() + 1);
+ }
+
+ void add(pid_t key, void *val) // throws std::bad_alloc
+ {
+ base_map[key] = val;
+ }
+
+ void add_from_reserve(pid_t key, void *val) noexcept
+ {
+ try {
+ base_map[key] = val;
+ backup_vector.resize(backup_vector.size() - 1);
+ }
+ catch (std::bad_alloc &) {
+ // We couldn't add into the map, use the reserve:
+ backup_vector[backup_size++] = pair(key, val);
+ }
+ }
+};
+
+namespace {
+ void sigchld_handler(int signum)
+ {
+ // If SIGCHLD has no handler (is ignored), SIGCHLD signals will
+ // not be queued for terminated child processes. (On Linux, the
+ // default disposition for SIGCHLD is to be ignored but *not* have
+ // this behavior, which seems inconsistent. Setting a handler doesn't
+ // hurt in any case).
+ }
+}
+
+template <class Base> class ChildProcEvents : public Base
+{
+ private:
+ pid_map child_waiters;
+
+ using SigInfo = typename Base::SigInfo;
+
+ protected:
+ void receiveSignal(SigInfo &siginfo, void *userdata)
+ {
+ if (siginfo.get_signo() == SIGCHLD) {
+ int status;
+ pid_t child;
+ while ((child = waitpid(-1, &status, WNOHANG)) > 0) {
+ pid_map::entry ent = child_waiters.erase(child);
+ if (ent.first) {
+ Base::receiveChildStat(child, status, ent.second);
+ }
+ }
+ }
+ else {
+ Base::receiveSignal(siginfo, userdata);
+ }
+ }
+
+ public:
+ void reserveChildWatch()
+ {
+ child_waiters.reserve();
+ }
+
+ void addChildWatch(pid_t child, void *val)
+ {
+ child_waiters.add(child, val);
+ }
+
+ void addReservedChildWatch(pid_t child, void *val) noexcept
+ {
+ child_waiters.add_from_reserve(child, val);
+ }
+
+ template <typename T> void init(T *loop_mech)
+ {
+ loop_mech->addSignalWatch(SIGCHLD, nullptr);
+ struct sigaction chld_action;
+ chld_action.sa_handler = sigchld_handler;
+ sigemptyset(&chld_action.sa_mask);
+ chld_action.sa_flags = 0;
+ sigaction(SIGCHLD, &chld_action, nullptr);
+ }
+};
+
+
+} // end namespace
--- /dev/null
+#include <system_error>
+#include <mutex>
+#include <type_traits>
+#include <unordered_map>
+#include <vector>
+
+#include <sys/epoll.h>
+#include <sys/signalfd.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <unistd.h>
+#include <signal.h>
+
+namespace dasynq {
+
+template <class Base> class EpollLoop;
+
+class EpollTraits
+{
+ template <class Base> friend class EpollLoop;
+
+ public:
+
+ class SigInfo
+ {
+ template <class Base> friend class EpollLoop;
+
+ struct signalfd_siginfo info;
+
+ public:
+ int get_signo() { return info.ssi_signo; }
+ int get_sicode() { return info.ssi_code; }
+ int get_siint() { return info.ssi_int; }
+ int get_ssiptr() { return info.ssi_ptr; }
+ int get_ssiaddr() { return info.ssi_addr; }
+
+ void set_signo(int signo) { info.ssi_signo = signo; }
+ };
+
+ class FD_r;
+
+ // File descriptor optional storage. If the mechanism can return the file descriptor, this
+ // class will be empty, otherwise it can hold a file descriptor.
+ class FD_s {
+ friend class FD_r;
+
+ // Epoll doesn't return the file descriptor (it can, but it can't return both file
+ // descriptor and user data).
+ int fd;
+ };
+
+ // File descriptor reference (passed to event callback). If the mechanism can return the
+ // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
+ // must be stored in an FD_s instance.
+ class FD_r {
+ public:
+ int getFd(FD_s ss)
+ {
+ return ss.fd;
+ }
+ };
+
+ const static bool has_bidi_fd_watch = true;
+ const static bool has_separate_rw_fd_watches = false;
+};
+
+
+template <class Base> class EpollLoop : public Base
+{
+ int epfd; // epoll fd
+ int sigfd; // signalfd fd; -1 if not initialised
+ sigset_t sigmask;
+
+ std::unordered_map<int, void *> sigdataMap;
+
+ // Base contains:
+ // lock - a lock that can be used to protect internal structure.
+ // receive*() methods will be called with lock held.
+ // receiveSignal(SigInfo &, user *) noexcept
+ // receiveFdEvent(FD_r, user *, int flags) noexcept
+
+ using SigInfo = EpollTraits::SigInfo;
+ using FD_r = typename EpollTraits::FD_r;
+
+ void processEvents(epoll_event *events, int r)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+ for (int i = 0; i < r; i++) {
+ void * ptr = events[i].data.ptr;
+
+ if (ptr == &sigfd) {
+ // Signal
+ SigInfo siginfo;
+ while (true) {
+ int r = read(sigfd, &siginfo.info, sizeof(siginfo.info));
+ if (r == -1) break;
+ if (siginfo.get_signo() != SIGCHLD) {
+ // TODO remove the special exception for SIGCHLD?
+ sigdelset(&sigmask, siginfo.get_signo());
+ }
+ auto iter = sigdataMap.find(siginfo.get_signo());
+ if (iter != sigdataMap.end()) {
+ void *userdata = (*iter).second;
+ Base::receiveSignal(siginfo, userdata);
+ }
+ }
+ signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
+ }
+ else {
+ int flags = 0;
+ (events[i].events & EPOLLIN) && (flags |= IN_EVENTS);
+ (events[i].events & EPOLLHUP) && (flags |= IN_EVENTS);
+ (events[i].events & EPOLLOUT) && (flags |= OUT_EVENTS);
+ (events[i].events & EPOLLERR) && (flags |= IN_EVENTS | OUT_EVENTS | ERR_EVENTS);
+ Base::receiveFdEvent(*this, FD_r(), ptr, flags);
+ }
+ }
+ }
+
+ public:
+
+ /**
+ * EpollLoop constructor.
+ *
+ * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
+ */
+ EpollLoop() : sigfd(-1)
+ {
+ epfd = epoll_create1(EPOLL_CLOEXEC);
+ if (epfd == -1) {
+ throw std::system_error(errno, std::system_category());
+ }
+ sigemptyset(&sigmask);
+ Base::init(this);
+ }
+
+ ~EpollLoop()
+ {
+ close(epfd);
+ if (sigfd != -1) {
+ close(sigfd);
+ }
+ }
+
+ // flags: IN_EVENTS | OUT_EVENTS
+ void addFdWatch(int fd, void *userdata, int flags)
+ {
+ struct epoll_event epevent;
+ // epevent.data.fd = fd;
+ epevent.data.ptr = userdata;
+ epevent.events = 0;
+
+ if (flags & ONE_SHOT) {
+ epevent.events = EPOLLONESHOT;
+ }
+ if (flags & IN_EVENTS) {
+ epevent.events |= EPOLLIN;
+ }
+ if (flags & OUT_EVENTS) {
+ epevent.events |= EPOLLOUT;
+ }
+
+ if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+ }
+
+ void removeFdWatch(int fd, int flags) noexcept
+ {
+ epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr);
+ }
+
+ void removeFdWatch_nolock(int fd, int flags) noexcept
+ {
+ removeFdWatch(fd, flags);
+ }
+
+ // Note this will *replace* the old flags with the new, that is,
+ // it can enable *or disable* read/write events.
+ void enableFdWatch(int fd, void *userdata, int flags) noexcept
+ {
+ struct epoll_event epevent;
+ // epevent.data.fd = fd;
+ epevent.data.ptr = userdata;
+ epevent.events = 0;
+
+ if (flags & ONE_SHOT) {
+ epevent.events = EPOLLONESHOT;
+ }
+ if (flags & IN_EVENTS) {
+ epevent.events |= EPOLLIN;
+ }
+ if (flags & OUT_EVENTS) {
+ epevent.events |= EPOLLOUT;
+ }
+
+ if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) {
+ // Shouldn't be able to fail
+ // throw new std::system_error(errno, std::system_category());
+ }
+ }
+
+ void enableFdWatch_nolock(int fd, void *userdata, int flags)
+ {
+ enableFdWatch(fd, userdata, flags);
+ }
+
+ void disableFdWatch(int fd, int flags) noexcept
+ {
+ struct epoll_event epevent;
+ // epevent.data.fd = fd;
+ epevent.data.ptr = nullptr;
+ epevent.events = 0;
+
+ // Epoll documentation says that hangup will still be reported, need to check
+ // whether this is really the case. Suspect it is really only the case if
+ // EPOLLIN is set.
+ if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) {
+ // Let's assume that this can't fail.
+ // throw new std::system_error(errno, std::system_category());
+ }
+ }
+
+ void disableFdWatch_nolock(int fd, int flags) noexcept
+ {
+ disableFdWatch(fd, flags);
+ }
+
+ // Note signal should be masked before call.
+ void addSignalWatch(int signo, void *userdata)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+ sigdataMap[signo] = userdata;
+
+ // Modify the signal fd to watch the new signal
+ bool was_no_sigfd = (sigfd == -1);
+ sigaddset(&sigmask, signo);
+ sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
+ if (sigfd == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+
+ if (was_no_sigfd) {
+ // Add the signalfd to the epoll set.
+ struct epoll_event epevent;
+ epevent.data.ptr = &sigfd;
+ epevent.events = EPOLLIN;
+ // No need for EPOLLONESHOT - we can pull the signals out
+ // as we see them.
+ if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) {
+ close(sigfd);
+ throw new std::system_error(errno, std::system_category());
+ }
+ }
+ }
+
+ // Note, called with lock held:
+ void rearmSignalWatch_nolock(int signo) noexcept
+ {
+ sigaddset(&sigmask, signo);
+ signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
+ }
+
+ void removeSignalWatch_nolock(int signo) noexcept
+ {
+ sigdelset(&sigmask, signo);
+ signalfd(sigfd, &sigmask, 0);
+ }
+
+ void removeSignalWatch(int signo) noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ removeSignalWatch_nolock(signo);
+ }
+
+ // If events are pending, process an unspecified number of them.
+ // If no events are pending, wait until one event is received and
+ // process this event (and possibly any other events received
+ // simultaneously).
+ // If processing an event removes a watch, there is a possibility
+ // that the watched event will still be reported (if it has
+ // occurred) before pullEvents() returns.
+ //
+ // do_wait - if false, returns immediately if no events are
+ // pending.
+ void pullEvents(bool do_wait)
+ {
+ epoll_event events[16];
+ int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0);
+ if (r == -1 || r == 0) {
+ // signal or no events
+ return;
+ }
+
+ processEvents(events, r);
+ }
+
+ // If events are pending, process one of them.
+ // If no events are pending, wait until one event is received and
+ // process this event.
+ //
+ // do_wait - if false, returns immediately if no events are
+ // pending.
+ void pullOneEvent(bool do_wait)
+ {
+ epoll_event events[1];
+ int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0);
+ if (r == -1 || r == 0) {
+ // signal or no events
+ return;
+ }
+
+ processEvents(events, r);
+ }
+
+ // Interrupt any current poll operation (pullEvents/pullOneEvent), causing
+ // it to to return immediately.
+ void interruptWait()
+ {
+ // TODO
+ }
+};
+
+} // end namespace
--- /dev/null
+namespace dasynq {
+
+// Event type bits
+constexpr unsigned int IN_EVENTS = 1;
+constexpr unsigned int OUT_EVENTS = 2;
+constexpr unsigned int ERR_EVENTS = 4;
+
+constexpr unsigned int ONE_SHOT = 8;
+
+// Masks:
+constexpr unsigned int IO_EVENTS = IN_EVENTS | OUT_EVENTS;
+
+}
--- /dev/null
+#include <system_error>
+#include <mutex>
+#include <type_traits>
+#include <unordered_map>
+#include <vector>
+
+#ifdef __OpenBSD__
+#include <sys/signal.h> // for __thrsigdivert aka sigtimedwait
+#include <sys/syscall.h>
+extern "C" {
+ int __thrsigdivert(sigset_t set, siginfo_t *info, const struct timespec * timeout);
+}
+#endif
+
+#include <sys/event.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <unistd.h>
+#include <signal.h>
+
+namespace dasynq {
+
+template <class Base> class KqueueLoop;
+
+class KqueueTraits
+{
+ template <class Base> friend class KqueueLoop;
+
+ public:
+
+ class SigInfo
+ {
+ template <class Base> friend class KqueueLoop;
+
+ siginfo_t info;
+
+ public:
+ int get_signo() { return info.si_signo; }
+ int get_sicode() { return info.si_code; }
+ char * get_ssiaddr() { return info.si_addr; }
+
+ void set_signo(int signo) { info.si_signo = signo; }
+ };
+
+ class FD_r;
+
+ // File descriptor optional storage. If the mechanism can return the file descriptor, this
+ // class will be empty, otherwise it can hold a file descriptor.
+ class FD_s {
+ // Epoll doesn't return the file descriptor (it can, but it can't return both file
+ // descriptor and user data).
+ // TODO make true empty.
+ };
+
+ // File descriptor reference (passed to event callback). If the mechanism can return the
+ // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
+ // must be stored in an FD_s instance.
+ class FD_r {
+ int fd;
+ public:
+ int getFd(FD_s ss)
+ {
+ return fd;
+ }
+ FD_r(int nfd) : fd(nfd)
+ {
+ }
+ };
+
+ const static bool has_bidi_fd_watch = false;
+ const static bool has_separate_rw_fd_watches = true;
+};
+
+#if defined(__OpenBSD__)
+// OpenBSD has no sigtimedwait (or sigwaitinfo) but does have "__thrsigdivert", which is
+// essentially an incomplete version of the same thing. Discussion with OpenBSD developer
+// Ted Unangst suggested that the siginfo_t structure returned might not always have all
+// fields set correctly. Furthermore there is a bug such that specifying a zero timeout (or
+// indeed any timeout less than a tick) results in NO timeout.
+static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct timespec *timeout)
+{
+ // We know that we're only called with a timeout of 0 (which doesn't work properly) and
+ // that we safely overwrite the timeout. So, we set tv_nsec to an invalid value, which
+ // will cause EINVAL to be returned, but will still pick up any pending signals *first*.
+ timeout->tv_nsec = 1000000001;
+ return __thrsigdivert(*ssp, info, timeout);
+}
+#endif
+
+template <class Base> class KqueueLoop : public Base
+{
+ int kqfd; // kqueue fd
+ sigset_t sigmask; // enabled signal watch mask
+
+ // Map of signal number to user data pointer. If kqueue had been better thought-through,
+ // we shouldn't need this. Although we can associate user data with an EVFILT_SIGNAL kqueue
+ // filter, the problem is that the kqueue signal report *coexists* with the regular signal
+ // delivery mechanism without having any connection to it. Whereas regular signals can be
+ // queued (especially "realtime" signals, via sigqueue()), kqueue just maintains a counter
+ // of delivery attempts and clears this when we read the event. What this means is that
+ // kqueue won't necessarily tell us if signals are pending, in the case that:
+ // 1) it already reported the attempted signal delivery and
+ // 2) more than one of the same signal was pending at that time and
+ // 3) no more deliveries of the same signal have been attempted in the meantime.
+ // Of course, if kqueue doesn't report the signal, then it doesn't give us the data associated
+ // with the event, so we need to maintain that separately too:
+ std::unordered_map<int, void *> sigdataMap;
+
+ // Base contains:
+ // lock - a lock that can be used to protect internal structure.
+ // receive*() methods will be called with lock held.
+ // receiveSignal(SigInfo &, user *) noexcept
+ // receiveFdEvent(FD_r, user *, int flags) noexcept
+
+ using SigInfo = KqueueTraits::SigInfo;
+ using FD_r = typename KqueueTraits::FD_r;
+
+ void processEvents(struct kevent *events, int r)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+ for (int i = 0; i < r; i++) {
+ if (events[i].filter == EVFILT_SIGNAL) {
+ SigInfo siginfo;
+ sigset_t sset;
+ sigemptyset(&sset);
+ sigaddset(&sset, events[i].ident);
+ struct timespec timeout;
+ timeout.tv_sec = 0;
+ timeout.tv_nsec = 0;
+ if (sigtimedwait(&sset, &siginfo.info, &timeout) > 0) {
+ Base::receiveSignal(siginfo, (void *)events[i].udata);
+ }
+
+ if (events[i].ident != SIGCHLD) {
+ sigdelset(&sigmask, events[i].ident);
+ events[i].flags = EV_DISABLE;
+ }
+ else {
+ // TODO can we remove this SIGCHLD hack?
+ events[i].flags = EV_ENABLE;
+ }
+ }
+ else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) {
+ int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS;
+ Base::receiveFdEvent(*this, FD_r(events[i].ident), events[i].udata, flags);
+ events[i].flags = EV_DISABLE | EV_CLEAR;
+ // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for
+ // another connection).
+ }
+ else {
+ events[i].flags = EV_DISABLE;
+ }
+ }
+
+ // Now we disable all received events, to simulate EV_DISPATCH:
+ kevent(kqfd, events, r, nullptr, 0, nullptr);
+ }
+
+ public:
+
+ /**
+ * KqueueLoop constructor.
+ *
+ * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
+ */
+ KqueueLoop()
+ {
+ kqfd = kqueue();
+ if (kqfd == -1) {
+ throw std::system_error(errno, std::system_category());
+ }
+ sigemptyset(&sigmask);
+ Base::init(this);
+ }
+
+ ~KqueueLoop()
+ {
+ close(kqfd);
+ }
+
+ void setFilterEnabled(short filterType, uintptr_t ident, bool enable)
+ {
+ struct kevent kev;
+ EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, 0, 0, 0);
+ kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
+ }
+
+ void removeFilter(short filterType, uintptr_t ident)
+ {
+ struct kevent kev;
+ EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0);
+ kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
+ }
+
+ // flags: IN_EVENTS | OUT_EVENTS
+ void addFdWatch(int fd, void *userdata, int flags)
+ {
+ // TODO kqueue doesn't support EVFILE_WRITE on file fd's :/
+
+ short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE;
+
+ struct kevent kev;
+ EV_SET(&kev, fd, filter, EV_ADD, 0, 0, userdata);
+ if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+ }
+
+ void removeFdWatch(int fd, int flags)
+ {
+ removeFilter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd);
+ }
+
+ void removeFdWatch_nolock(int fd, int flags)
+ {
+ removeFdWatch(fd, flags);
+ }
+
+ void enableFdWatch(int fd, void *userdata, int flags)
+ {
+ setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, true);
+ }
+
+ void enableFdWatch_nolock(int fd, void *userdata, int flags)
+ {
+ enableFdWatch(fd, userdata, flags);
+ }
+
+ void disableFdWatch(int fd, int flags)
+ {
+ setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, false);
+ }
+
+ void disableFdWatch_nolock(int fd, int flags)
+ {
+ disableFdWatch(fd, flags);
+ }
+
+ // Note signal should be masked before call.
+ void addSignalWatch(int signo, void *userdata)
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+ sigdataMap[signo] = userdata;
+ sigaddset(&sigmask, signo);
+
+ struct kevent evt;
+ EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD, 0, 0, userdata);
+ // TODO use EV_DISPATCH if available (not on OpenBSD)
+
+ if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+ }
+
+ // Note, called with lock held:
+ void rearmSignalWatch_nolock(int signo) noexcept
+ {
+ sigaddset(&sigmask, signo);
+
+ struct kevent evt;
+ EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ENABLE, 0, 0, 0);
+ // TODO use EV_DISPATCH if available (not on OpenBSD)
+
+ kevent(kqfd, &evt, 1, nullptr, 0, nullptr);
+ }
+
+ void removeSignalWatch_nolock(int signo) noexcept
+ {
+ sigdelset(&sigmask, signo);
+
+ struct kevent evt;
+ EV_SET(&evt, signo, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0);
+
+ kevent(kqfd, &evt, 1, nullptr, 0, nullptr);
+ }
+
+ void removeSignalWatch(int signo) noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ removeSignalWatch_nolock(signo);
+ }
+
+ // If events are pending, process an unspecified number of them.
+ // If no events are pending, wait until one event is received and
+ // process this event (and possibly any other events received
+ // simultaneously).
+ // If processing an event removes a watch, there is a possibility
+ // that the watched event will still be reported (if it has
+ // occurred) before pullEvents() returns.
+ //
+ // do_wait - if false, returns immediately if no events are
+ // pending.
+ void pullEvents(bool do_wait)
+ {
+ // We actually need to check pending signals, since
+ // kqueue can count signals as they are delivered but the count is
+ // cleared when we poll the kqueue, meaning that signals might still
+ // be pending if they were queued multiple times at the last poll.
+
+ // TODO we should only poll for signals that *have* been reported
+ // as being raised more than once prior via kevent, rather than all
+ // signals that have been registered - in many cases that will allow
+ // us to skip the sigtimedwait call altogether.
+
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ struct timespec timeout;
+ timeout.tv_sec = 0;
+ timeout.tv_nsec = 0;
+ SigInfo siginfo;
+ int rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout);
+ while (rsigno > 0) {
+ // TODO avoid this hack for SIGCHLD somehow
+ if (rsigno != SIGCHLD) {
+ sigdelset(&sigmask, rsigno);
+ // TODO accumulate and disable multiple filters with a single kevents call
+ // rather than disabling each individually
+ setFilterEnabled(EVFILT_SIGNAL, rsigno, false);
+ }
+ Base::receiveSignal(siginfo, sigdataMap[rsigno]);
+ rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout);
+ }
+ }
+
+ struct kevent events[16];
+ struct timespec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+ int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
+ if (r == -1 || r == 0) {
+ // signal or no events
+ return;
+ }
+
+ processEvents(events, r);
+ }
+
+ // If events are pending, process one of them.
+ // If no events are pending, wait until one event is received and
+ // process this event.
+ //
+ // do_wait - if false, returns immediately if no events are
+ // pending.
+ void pullOneEvent(bool do_wait)
+ {
+ // TODO must check for pending signals as per pullEvents()
+ struct kevent events[1];
+ struct timespec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+ int r = kevent(kqfd, nullptr, 0, events, 1, do_wait ? nullptr : &ts);
+ if (r == -1 || r == 0) {
+ // signal or no events
+ return;
+ }
+
+ processEvents(events, r);
+ }
+
+ // Interrupt any current poll operation (pullEvents/pullOneEvent), causing
+ // it to to return immediately.
+ void interruptWait()
+ {
+ // TODO
+ }
+};
+
+} // end namespace
--- /dev/null
+#ifndef D_MUTEX_H_INCLUDED
+#define D_MUTEX_H_INCLUDED
+
+//#include <pthread.h>
+#include <mutex>
+
+namespace dasynq {
+
+// Simple non-recursive mutex, with priority inheritance to avoid priority inversion.
+/*
+class DMutex
+{
+ private:
+ pthread_mutex_t mutex;
+
+ public:
+ DMutex()
+ {
+ // Avoid priority inversion by using PTHREAD_PRIO_INHERIT
+ pthread_mutexattr_t attribs;
+ pthread_mutexattr_init(&attribs);
+ pthread_mutexattr_setprotocol(&attribs, PTHREAD_PRIO_INHERIT);
+ pthread_mutex_init(&mutex, &attribs);
+ }
+
+ void lock()
+ {
+ pthread_mutex_lock(&mutex);
+ }
+
+ void unlock()
+ {
+ pthread_mutex_unlock(&mutex);
+ }
+};
+*/
+
+using DMutex = std::mutex;
+
+// A "null" mutex, for which locking / unlocking actually does nothing.
+class NullMutex
+{
+ #ifdef __GNUC__
+ #ifndef __clang__
+ char empty[0]; // Make class instances take up no space (gcc)
+ #else
+ char empty[0] __attribute__((unused)); // Make class instances take up no space (clang)
+ #endif
+ #endif
+
+ public:
+ void lock() { }
+ void unlock() { }
+ void try_lock() { }
+};
+
+
+} // end of namespace
+
+
+#endif
--- /dev/null
+#ifndef DASYNC_H_INCLUDED
+#define DASYNC_H_INCLUDED
+
+#if defined(__OpenBSD__)
+#define HAVE_KQUEUE 1
+#endif
+
+#if defined(__linux__)
+#define HAVE_EPOLL 1
+#endif
+
+#include "dasynq-flags.h"
+
+#if defined(HAVE_KQUEUE)
+#include "dasynq-kqueue.h"
+#include "dasynq-childproc.h"
+namespace dasynq {
+ template <typename T> using Loop = KqueueLoop<T>;
+ using LoopTraits = KqueueTraits;
+}
+#elif defined(HAVE_EPOLL)
+#include "dasynq-epoll.h"
+#include "dasynq-childproc.h"
+namespace dasynq {
+ template <typename T> using Loop = EpollLoop<T>;
+ using LoopTraits = EpollTraits;
+}
+#endif
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <cstddef>
+
+#include "dasynq-mutex.h"
+
+
+
+// TODO consider using atomic variables instead of explicit locking where appropriate
+
+// Allow optimisation of empty classes by including this in the body:
+// May be included as the last entry for a class which is only
+// _potentially_ empty.
+
+/*
+#ifdef __GNUC__
+#ifdef __clang__
+#define EMPTY_BODY private: char empty_fill[0];
+#else
+#define EMPTY_BODY private: char empty_fill[0];
+#endif
+#else
+#define EMPTY_BODY
+#endif
+*/
+
+namespace dasynq {
+
+
+/**
+ * Values for rearm/disarm return from event handlers
+ */
+enum class Rearm
+{
+ /** Re-arm the event watcher so that it receives further events */
+ REARM,
+ /** Disarm the event watcher so that it receives no further events, until it is re-armed explicitly */
+ DISARM,
+ /** Leave in current armed/disarmed state */
+ NOOP,
+ /** Remove the event watcher (and call "removed" callback) */
+ REMOVE,
+ /** The watcher has been removed - don't touch it! */
+ REMOVED
+// TODO: add a REQUEUE option, which means, "I didn't complete input/output, run me again soon"
+};
+
+
+// Forward declarations:
+template <typename T_Mutex> class EventLoop;
+template <typename T_Mutex> class FdWatcher;
+template <typename T_Mutex> class BidiFdWatcher;
+template <typename T_Mutex> class SignalWatcher;
+template <typename T_Mutex> class ChildProcWatcher;
+
+// Information about a received signal.
+// This is essentially a wrapper for the POSIX siginfo_t; its existence allows for mechanisms that receive
+// equivalent signal information in a different format (eg signalfd on Linux).
+using SigInfo = LoopTraits::SigInfo;
+
+namespace dprivate {
+ // (non-public API)
+
+ enum class WatchType
+ {
+ SIGNAL,
+ FD,
+ CHILD,
+ SECONDARYFD
+ };
+
+ template <typename T_Mutex, typename Traits> class EventDispatch;
+
+ // For FD watchers:
+ // Use this watch flag to indicate that in and out events should be reported separately,
+ // that is, watcher should not be disabled until all watched event types are queued.
+ constexpr static int multi_watch = 4;
+
+ // Represents a queued event notification
+ class BaseWatcher
+ {
+ template <typename T_Mutex, typename Traits> friend class EventDispatch;
+ template <typename T_Mutex> friend class dasynq::EventLoop;
+
+ protected:
+ WatchType watchType;
+ int active : 1;
+ int deleteme : 1;
+
+ BaseWatcher * prev;
+ BaseWatcher * next;
+
+ public:
+
+ // Perform initialisation necessary before registration with an event loop
+ void init()
+ {
+ active = false;
+ deleteme = false;
+ prev = nullptr;
+ next = nullptr;
+ }
+
+ BaseWatcher(WatchType wt) noexcept : watchType(wt) { }
+
+ virtual ~BaseWatcher() noexcept { }
+
+ // Called when the watcher has been removed.
+ // It is guaranteed by the caller that:
+ // - the dispatch method is not currently running
+ // - the dispatch method will not be called.
+ virtual void watchRemoved() noexcept
+ {
+ // TODO this "delete" behaviour could be dependent on a flag, perhaps?
+ // delete this;
+ }
+ };
+
+ // Base signal event - not part of public API
+ template <typename T_Mutex>
+ class BaseSignalWatcher : public BaseWatcher
+ {
+ template <typename M, typename Traits> friend class EventDispatch;
+ friend class dasynq::EventLoop<T_Mutex>;
+
+ protected:
+ SigInfo siginfo;
+ BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { }
+
+ public:
+ typedef SigInfo &SigInfo_p;
+
+ virtual Rearm gotSignal(EventLoop<T_Mutex> * eloop, int signo, SigInfo_p siginfo) = 0;
+ };
+
+ template <typename T_Mutex>
+ class BaseFdWatcher : public BaseWatcher
+ {
+ template <typename, typename Traits> friend class EventDispatch;
+ friend class dasynq::EventLoop<T_Mutex>;
+
+ protected:
+ int watch_fd;
+ int watch_flags;
+ int event_flags;
+
+ BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }
+
+ public:
+ virtual Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) = 0;
+ };
+
+ template <typename T_Mutex>
+ class BaseBidiFdWatcher : public BaseFdWatcher<T_Mutex>
+ {
+ template <typename, typename Traits> friend class EventDispatch;
+ friend class dasynq::EventLoop<T_Mutex>;
+
+ // This should never actually get called:
+ Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) final
+ {
+ return Rearm::REARM; // should not be reachable.
+ };
+
+ protected:
+
+ // The main instance is the "input" watcher only; we keep a secondary watcher
+ // with a secondary set of flags for the "output" watcher:
+ BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD);
+
+ int read_removed : 1; // read watch removed?
+ int write_removed : 1; // write watch removed?
+
+ public:
+ virtual Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept = 0;
+ virtual Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept = 0;
+ };
+
+ template <typename T_Mutex>
+ class BaseChildWatcher : public BaseWatcher
+ {
+ template <typename, typename Traits> friend class EventDispatch;
+ friend class dasynq::EventLoop<T_Mutex>;
+
+ protected:
+ pid_t watch_pid;
+ int child_status;
+
+ BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { }
+
+ public:
+ virtual void gotTermStat(EventLoop<T_Mutex> * eloop, pid_t child, int status) = 0;
+ };
+
+ // Classes for implementing a fair(ish) wait queue.
+ // A queue node can be signalled when it reaches the head of
+ // the queue.
+
+ template <typename T_Mutex> class waitqueue;
+ template <typename T_Mutex> class waitqueue_node;
+
+ // Select an appropriate conditiona variable type for a mutex:
+ // condition_variable if mutex is std::mutex, or condition_variable_any
+ // otherwise.
+ template <class T_Mutex> class condvarSelector;
+
+ template <> class condvarSelector<std::mutex>
+ {
+ public:
+ typedef std::condition_variable condvar;
+ };
+
+ template <class T_Mutex> class condvarSelector
+ {
+ public:
+ typedef std::condition_variable_any condvar;
+ };
+
+ template <> class waitqueue_node<NullMutex>
+ {
+ // Specialised waitqueue_node for NullMutex.
+ // TODO can this be reduced to 0 data members?
+ friend class waitqueue<NullMutex>;
+ waitqueue_node * next = nullptr;
+
+ public:
+ void wait(std::unique_lock<NullMutex> &ul) { }
+ void signal() { }
+ };
+
+ template <typename T_Mutex> class waitqueue_node
+ {
+ typename condvarSelector<T_Mutex>::condvar condvar;
+ friend class waitqueue<T_Mutex>;
+ waitqueue_node * next = nullptr;
+
+ public:
+ void signal()
+ {
+ condvar.notify_one();
+ }
+
+ void wait(std::unique_lock<T_Mutex> &mutex_lock)
+ {
+ condvar.wait(mutex_lock);
+ }
+ };
+
+ template <typename T_Mutex> class waitqueue
+ {
+ waitqueue_node<T_Mutex> * tail = nullptr;
+ waitqueue_node<T_Mutex> * head = nullptr;
+
+ public:
+ waitqueue_node<T_Mutex> * unqueue()
+ {
+ head = head->next;
+ return head;
+ }
+
+ waitqueue_node<T_Mutex> * getHead()
+ {
+ return head;
+ }
+
+ void queue(waitqueue_node<T_Mutex> *node)
+ {
+ if (tail) {
+ tail->next = node;
+ }
+ else {
+ head = node;
+ }
+ }
+ };
+
+ // This class serves as the base class (mixin) for the AEN mechanism class.
+ // Note that EventDispatch, here, and EventLoop (below) are really two sides of one coin;
+ // they do not work independently. The mixin pattern that we use to avoid dynamic dispatch
+ // forces them to be two seperate classes, however.
+ //
+ // The EventDispatch class maintains the queued event data structures. It inserts watchers
+ // into the queue when eventes are received (receiveXXX methods).
+ template <typename T_Mutex, typename Traits> class EventDispatch : public Traits
+ {
+ friend class EventLoop<T_Mutex>;
+
+ // queue data structure/pointer
+ BaseWatcher * first;
+
+ using BaseSignalWatcher = dasynq::dprivate::BaseSignalWatcher<T_Mutex>;
+ using BaseFdWatcher = dasynq::dprivate::BaseFdWatcher<T_Mutex>;
+ using BaseBidiFdWatcher = dasynq::dprivate::BaseBidiFdWatcher<T_Mutex>;
+ using BaseChildWatcher = dasynq::dprivate::BaseChildWatcher<T_Mutex>;
+
+ void queueWatcher(BaseWatcher *bwatcher)
+ {
+ // Put in queue:
+ if (first == nullptr) {
+ bwatcher->prev = bwatcher;
+ bwatcher->next = bwatcher;
+ first = bwatcher;
+ }
+ else {
+ first->prev->next = bwatcher;
+ bwatcher->prev = first->prev;
+ first->prev = bwatcher;
+ bwatcher->next = first;
+ }
+ }
+
+ bool isQueued(BaseWatcher *bwatcher)
+ {
+ return bwatcher->prev != nullptr;
+ }
+
+ void dequeueWatcher(BaseWatcher *bwatcher)
+ {
+ if (bwatcher->prev == bwatcher) {
+ // Only item in queue
+ first = nullptr;
+ }
+ else {
+ if (first == bwatcher) first = first->next;
+ bwatcher->prev->next = bwatcher->next;
+ bwatcher->next->prev = bwatcher->prev;
+ }
+
+ bwatcher->prev = nullptr;
+ bwatcher->next = nullptr;
+ }
+
+ protected:
+ T_Mutex lock;
+
+ void receiveSignal(typename Traits::SigInfo & siginfo, void * userdata)
+ {
+ BaseSignalWatcher * bwatcher = static_cast<BaseSignalWatcher *>(userdata);
+ bwatcher->siginfo = siginfo;
+ queueWatcher(bwatcher);
+ }
+
+ template <typename T>
+ void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags)
+ {
+ BaseFdWatcher * bfdw = static_cast<BaseFdWatcher *>(userdata);
+
+ bfdw->event_flags |= flags;
+
+ BaseWatcher * bwatcher = bfdw;
+
+ bool is_multi_watch = bfdw->watch_flags & multi_watch;
+ if (is_multi_watch) {
+ BaseBidiFdWatcher *bbdw = static_cast<BaseBidiFdWatcher *>(bwatcher);
+ if (flags & IN_EVENTS && flags & OUT_EVENTS) {
+ // Queue the secondary watcher first:
+ queueWatcher(&bbdw->outWatcher);
+ }
+ else if (flags & OUT_EVENTS) {
+ // Use the secondary watcher for queueing:
+ bwatcher = &(bbdw->outWatcher);
+ }
+ }
+
+ queueWatcher(bwatcher);
+
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ // If this is a bidirectional fd-watch, it has been disabled in *both* directions
+ // as the event was delivered. However, the other direction should not be disabled
+ // yet, so we need to re-enable:
+ int in_out_mask = IN_EVENTS | OUT_EVENTS;
+ if (is_multi_watch && bfdw->event_flags != (bfdw->watch_flags & in_out_mask)) {
+ // We need to re-enable the other channel now:
+ loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata,
+ (bfdw->watch_flags & ~(bfdw->event_flags)) | ONE_SHOT);
+ }
+ }
+ }
+
+ void receiveChildStat(pid_t child, int status, void * userdata)
+ {
+ BaseChildWatcher * watcher = static_cast<BaseChildWatcher *>(userdata);
+ watcher->child_status = status;
+ queueWatcher(watcher);
+ }
+
+ // Pull a single event from the queue
+ BaseWatcher * pullEvent()
+ {
+ BaseWatcher * r = first;
+ if (r != nullptr) {
+ dequeueWatcher(r);
+ }
+ return r;
+ }
+
+ void issueDelete(BaseWatcher *watcher) noexcept
+ {
+ // This is only called when the attention lock is held, so if the watcher is not
+ // active/queued now, it cannot become active (and will not be reported with an event)
+ // during execution of this function.
+
+ lock.lock();
+
+ // TODO this needs to handle multi-watch (BidiFdWatcher) properly
+
+ if (watcher->active) {
+ // If the watcher is active, set deleteme true; the watcher will be removed
+ // at the end of current processing (i.e. when active is set false).
+ watcher->deleteme = true;
+ lock.unlock();
+ }
+ else {
+ // Actually do the delete.
+ if (isQueued(watcher)) {
+ dequeueWatcher(watcher);
+ }
+
+ lock.unlock();
+ watcher->watchRemoved();
+ }
+ }
+
+ void issueDelete(BaseBidiFdWatcher *watcher) noexcept
+ {
+ lock.lock();
+
+ if (watcher->active) {
+ watcher->deleteme = true;
+ }
+ else {
+ if (isQueued(watcher)) {
+ dequeueWatcher(watcher);
+ }
+
+ watcher->read_removed = true;
+ }
+
+ BaseWatcher *secondary = &(watcher->outWatcher);
+ if (secondary->active) {
+ secondary->deleteme = true;
+ }
+ else {
+ if (isQueued(secondary)) {
+ dequeueWatcher(secondary);
+ }
+
+ watcher->write_removed = true;
+ }
+
+ if (watcher->read_removed && watcher->write_removed) {
+ lock.unlock();
+ watcher->watchRemoved();
+ }
+ else {
+ lock.unlock();
+ }
+ }
+ };
+}
+
+
+template <typename T_Mutex> class EventLoop
+{
+ friend class FdWatcher<T_Mutex>;
+ friend class BidiFdWatcher<T_Mutex>;
+ friend class SignalWatcher<T_Mutex>;
+ friend class ChildProcWatcher<T_Mutex>;
+
+ template <typename T, typename U> using EventDispatch = dprivate::EventDispatch<T,U>;
+ template <typename T> using waitqueue = dprivate::waitqueue<T>;
+ template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
+ using BaseWatcher = dprivate::BaseWatcher;
+ using BaseSignalWatcher = dprivate::BaseSignalWatcher<T_Mutex>;
+ using BaseFdWatcher = dprivate::BaseFdWatcher<T_Mutex>;
+ using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher<T_Mutex>;
+ using BaseChildWatcher = dprivate::BaseChildWatcher<T_Mutex>;
+ using WatchType = dprivate::WatchType;
+
+ Loop<ChildProcEvents<EventDispatch<T_Mutex, LoopTraits>>> loop_mech;
+
+ // There is a complex problem with most asynchronous event notification mechanisms
+ // when used in a multi-threaded environment. Generally, a file descriptor or other
+ // event type that we are watching will be associated with some data used to manage
+ // that event source. For example a web server needs to maintain information about
+ // each client connection, such as the state of the connection (what protocol version
+ // has been negotiated, etc; if a transfer is taking place, what file is being
+ // transferred etc).
+ //
+ // However, sometimes we want to remove an event source (eg webserver wants to drop
+ // a connection) and delete the associated data. The problem here is that it is
+ // difficult to be sure when it is ok to actually remove the data, since when
+ // requesting to unwatch the source in one thread it is still possible that an
+ // event from that source is just being reported to another thread (in which case
+ // the data will be needed).
+ //
+ // To solve that, we:
+ // - allow only one thread to poll for events at a time, using a lock
+ // - use the same lock to prevent polling, if we want to unwatch an event source
+ // - generate an event to interrupt any polling that may already be occurring in
+ // another thread
+ // - mark handlers as active if they are currently executing, and
+ // - when removing an active handler, simply set a flag which causes it to be
+ // removed once the current processing is finished, rather than removing it
+ // immediately.
+ //
+ // In particular the lock mechanism for preventing multiple threads polling and
+ // for allowing polling to be interrupted is tricky. We can't use a simple mutex
+ // since there is significant chance that it will be highly contended and there
+ // are no guarantees that its acquisition will be fair. In particular, we don't
+ // want a thread that is trying to unwatch a source being starved while another
+ // thread polls the event source.
+ //
+ // So, we use two wait queues protected by a single mutex. The "attn_waitqueue"
+ // (attention queue) is the high-priority queue, used for threads wanting to
+ // unwatch event sources. The "wait_waitquueue" is the queue used by threads
+ // that wish to actually poll for events.
+ // - The head of the "attn_waitqueue" is always the holder of the lock
+ // - Therefore, a poll-waiter must be moved from the wait_waitqueue to the
+ // attn_waitqueue to actually gain the lock. This is only done if the
+ // attn_waitqueue is otherwise empty.
+ // - The mutex only protects manipulation of the wait queues, and so should not
+ // be highly contended.
+
+ T_Mutex wait_lock; // wait lock, used to prevent multiple threads from waiting
+ // on the event queue simultaneously.
+ waitqueue<T_Mutex> attn_waitqueue;
+ waitqueue<T_Mutex> wait_waitqueue;
+
+ T_Mutex &getBaseLock()
+ {
+ return loop_mech.lock;
+ }
+
+ void registerSignal(BaseSignalWatcher *callBack, int signo)
+ {
+ loop_mech.addSignalWatch(signo, callBack);
+ }
+
+ void deregister(BaseSignalWatcher *callBack, int signo) noexcept
+ {
+ loop_mech.removeSignalWatch(signo);
+
+ waitqueue_node<T_Mutex> qnode;
+ getAttnLock(qnode);
+
+ EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+ ed.issueDelete(callBack);
+
+ releaseLock(qnode);
+ }
+
+ void registerFd(BaseFdWatcher *callback, int fd, int eventmask)
+ {
+ loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT);
+ }
+
+ void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask)
+ {
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ // TODO
+ }
+ else {
+ loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT);
+ }
+ }
+
+ void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled)
+ {
+ if (enabled) {
+ loop_mech.enableFdWatch(fd, watcher, watch_flags | ONE_SHOT);
+ }
+ else {
+ loop_mech.disableFdWatch(fd, watch_flags);
+ }
+ }
+
+ void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled)
+ {
+ if (enabled) {
+ loop_mech.enableFdWatch_nolock(fd, watcher, watch_flags | ONE_SHOT);
+ }
+ else {
+ loop_mech.disableFdWatch_nolock(fd, watch_flags);
+ }
+ }
+
+ void deregister(BaseFdWatcher *callback, int fd)
+ {
+ loop_mech.removeFdWatch(fd, callback->watch_flags);
+
+ waitqueue_node<T_Mutex> qnode;
+ getAttnLock(qnode);
+
+ EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+ ed.issueDelete(callback);
+
+ releaseLock(qnode);
+ }
+
+ void deregister(BaseBidiFdWatcher *callback, int fd)
+ {
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ // TODO
+ }
+ else {
+ loop_mech.removeFdWatch(fd, callback->watch_flags);
+ }
+
+ waitqueue_node<T_Mutex> qnode;
+ getAttnLock(qnode);
+
+ EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+ ed.issueDelete(callback);
+
+ releaseLock(qnode);
+ }
+
+ void reserveChildWatch(BaseChildWatcher *callBack)
+ {
+ loop_mech.reserveChildWatch();
+ }
+
+ void registerChild(BaseChildWatcher *callBack, pid_t child)
+ {
+ loop_mech.addChildWatch(child, callBack);
+ }
+
+ void registerReservedChild(BaseChildWatcher *callBack, pid_t child) noexcept
+ {
+ loop_mech.addReservedChildWatch(child, callBack);
+ }
+
+ void dequeueWatcher(BaseWatcher *watcher) noexcept
+ {
+ if (loop_mech.isQueued(watcher)) {
+ loop_mech.dequeueWatcher(watcher);
+ }
+ }
+
+ // Acquire the attention lock (when held, ensures that no thread is polling the AEN
+ // mechanism).
+ void getAttnLock(waitqueue_node<T_Mutex> &qnode)
+ {
+ std::unique_lock<T_Mutex> ulock(wait_lock);
+ attn_waitqueue.queue(&qnode);
+ if (attn_waitqueue.getHead() != &qnode) {
+ loop_mech.interruptWait();
+ while (attn_waitqueue.getHead() != &qnode) {
+ qnode.wait(ulock);
+ }
+ }
+ }
+
+ // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower
+ // priority than the attention lock).
+ void getPollwaitLock(waitqueue_node<T_Mutex> &qnode)
+ {
+ std::unique_lock<T_Mutex> ulock(wait_lock);
+ if (attn_waitqueue.getHead() == nullptr) {
+ // Queue is completely empty:
+ attn_waitqueue.queue(&qnode);
+ }
+ else {
+ wait_waitqueue.queue(&qnode);
+ }
+
+ while (attn_waitqueue.getHead() != &qnode) {
+ qnode.wait(ulock);
+ }
+ }
+
+ // Release the poll-wait/attention lock.
+ void releaseLock(waitqueue_node<T_Mutex> &qnode)
+ {
+ std::unique_lock<T_Mutex> ulock(wait_lock);
+ waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
+ if (nhead != nullptr) {
+ nhead->signal();
+ }
+ else {
+ nhead = wait_waitqueue.getHead();
+ if (nhead != nullptr) {
+ attn_waitqueue.queue(nhead);
+ nhead->signal();
+ }
+ }
+ }
+
+ void processSignalRearm(BaseSignalWatcher * bsw, Rearm rearmType)
+ {
+ // Called with lock held
+ if (rearmType == Rearm::REARM) {
+ loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo());
+ }
+ else if (rearmType == Rearm::REMOVE) {
+ loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo());
+ }
+ }
+
+ Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch)
+ {
+ // Called with lock held
+ if (is_multi_watch) {
+ BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
+
+ if (rearmType == Rearm::REMOVE) {
+ bdfw->read_removed = 1;
+ bdfw->watch_flags &= ~IN_EVENTS;
+
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ if (! bdfw->write_removed) {
+ return Rearm::NOOP;
+ }
+ else {
+ // both removed: actually remove
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
+ return Rearm::REMOVE;
+ }
+ }
+ else {
+ // TODO this will need flags for such a loop, since it can't
+ // otherwise distinguish which channel watch to remove
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd, bdfw->watch_flags);
+ }
+ }
+ else if (rearmType == Rearm::DISARM) {
+ // Nothing more to do
+ }
+ else if (rearmType == Rearm::REARM) {
+ bdfw->watch_flags |= IN_EVENTS;
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ (bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ }
+ else {
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ IN_EVENTS | ONE_SHOT);
+ }
+ }
+ return rearmType;
+ }
+ else {
+ if (rearmType == Rearm::REARM) {
+ loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw,
+ (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ }
+ else if (rearmType == Rearm::REMOVE) {
+ loop_mech.removeFdWatch_nolock(bfw->watch_fd, bfw->watch_flags);
+ }
+ return rearmType;
+ }
+ }
+
+ Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType)
+ {
+ // Called with lock held
+ if (rearmType == Rearm::REMOVE) {
+ bdfw->write_removed = 1;
+ bdfw->watch_flags &= ~OUT_EVENTS;
+
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ // TODO this will need flags for such a loop, since it can't
+ // otherwise distinguish which channel watch to remove
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd, bdfw->watch_flags);
+ return bdfw->read_removed ? Rearm::REMOVE : Rearm::NOOP;
+ }
+ else {
+ if (! bdfw->read_removed) {
+ return Rearm::NOOP;
+ }
+ else {
+ // both removed: actually remove
+ loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
+ return Rearm::REMOVE;
+ }
+ }
+ }
+ else if (rearmType == Rearm::DISARM) {
+ // Nothing more to do
+ }
+ else if (rearmType == Rearm::REARM) {
+ bdfw->watch_flags |= OUT_EVENTS;
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ (bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+ }
+ else {
+ loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+ static_cast<BaseWatcher *>(bdfw),
+ OUT_EVENTS | ONE_SHOT);
+ }
+ }
+ return rearmType;
+ }
+
+ bool processEvents() noexcept
+ {
+ EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+ ed.lock.lock();
+
+ // So this pulls *all* currently pending events and processes them in the current thread.
+ // That's probably good for throughput, but maybe the behavior should be configurable.
+
+ BaseWatcher * pqueue = ed.pullEvent();
+ bool active = false;
+
+ while (pqueue != nullptr) {
+
+ pqueue->active = true;
+ active = true;
+
+ Rearm rearmType = Rearm::NOOP;
+ bool is_multi_watch = false;
+ BaseBidiFdWatcher *bbfw = nullptr;
+
+ // (Above variables are initialised only to silence compiler warnings).
+
+ // Read/manipulate watch_flags (if necessary) *before* we release the lock:
+ if (pqueue->watchType == WatchType::FD) {
+ BaseFdWatcher *bfw = static_cast<BaseFdWatcher *>(pqueue);
+ bbfw = static_cast<BaseBidiFdWatcher *>(bfw);
+ is_multi_watch = bfw->watch_flags & dprivate::multi_watch;
+ if (! LoopTraits::has_separate_rw_fd_watches && is_multi_watch) {
+ // Clear the input watch flags to avoid enabling read watcher while active:
+ bfw->watch_flags &= ~IN_EVENTS;
+ }
+ }
+ else if (pqueue->watchType == WatchType::SECONDARYFD) {
+ is_multi_watch = true;
+ char * rp = (char *)pqueue;
+ rp -= offsetof(BaseBidiFdWatcher, outWatcher);
+ bbfw = (BaseBidiFdWatcher *)rp;
+ if (! LoopTraits::has_separate_rw_fd_watches) {
+ bbfw->watch_flags &= ~OUT_EVENTS;
+ }
+ }
+
+ ed.lock.unlock();
+
+ // Note that we select actions based on the type of the watch, as determined by the watchType
+ // member. In some ways this screams out for polmorphism; a virtual function could be overridden
+ // by each of the watcher types. I've instead used switch/case because I think it will perform
+ // slightly better without the overhead of a virtual function dispatch, but it's got to be a
+ // close call; I might be guilty of premature optimisation here.
+
+ switch (pqueue->watchType) {
+ case WatchType::SIGNAL: {
+ BaseSignalWatcher *bsw = static_cast<BaseSignalWatcher *>(pqueue);
+ rearmType = bsw->gotSignal(this, bsw->siginfo.get_signo(), bsw->siginfo);
+ break;
+ }
+ case WatchType::FD: {
+ BaseFdWatcher *bfw = static_cast<BaseFdWatcher *>(pqueue);
+ if (is_multi_watch) {
+ // The primary watcher for a multi-watch watcher is queued for
+ // read events.
+ rearmType = bbfw->readReady(this, bfw->watch_fd);
+ }
+ else {
+ rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags);
+ }
+ break;
+ }
+ case WatchType::CHILD: {
+ BaseChildWatcher *bcw = static_cast<BaseChildWatcher *>(pqueue);
+ bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status);
+ // Child watches automatically remove:
+ // TODO what if they want to return REMOVED...
+ rearmType = Rearm::REMOVE;
+ break;
+ }
+ case WatchType::SECONDARYFD: {
+ // first construct a pointer to the main watcher:
+ rearmType = bbfw->writeReady(this, bbfw->watch_fd);
+ break;
+ }
+ default: ;
+ }
+
+ ed.lock.lock();
+
+ // (if REMOVED, we must not touch pqueue at all)
+ if (rearmType != Rearm::REMOVED) {
+
+ pqueue->active = false;
+ if (pqueue->deleteme) {
+ // We don't want a watch that is marked "deleteme" to re-arm itself.
+ rearmType = Rearm::REMOVE;
+ }
+ switch (pqueue->watchType) {
+ case WatchType::SIGNAL:
+ processSignalRearm(static_cast<BaseSignalWatcher *>(pqueue), rearmType);
+ break;
+ case WatchType::FD:
+ rearmType = processFdRearm(static_cast<BaseFdWatcher *>(pqueue), rearmType, is_multi_watch);
+ break;
+ case WatchType::SECONDARYFD:
+ rearmType = processSecondaryRearm(bbfw, rearmType);
+ break;
+ default: ;
+ }
+
+ if (pqueue->deleteme || rearmType == Rearm::REMOVE) {
+ ed.lock.unlock();
+ (is_multi_watch ? bbfw : pqueue)->watchRemoved();
+ ed.lock.lock();
+ }
+ }
+
+ pqueue = ed.pullEvent();
+ }
+
+ ed.lock.unlock();
+ return active;
+ }
+
+
+ public:
+ void run() noexcept
+ {
+ while (! processEvents()) {
+ waitqueue_node<T_Mutex> qnode;
+
+ // We only allow one thread to poll the mechanism at any time, since otherwise
+ // removing event watchers is a nightmare beyond comprehension.
+ getPollwaitLock(qnode);
+
+ // Pull events from the AEN mechanism and insert them in our internal queue:
+ loop_mech.pullEvents(true);
+
+ // Now release the wait lock:
+ releaseLock(qnode);
+ }
+ }
+};
+
+
+typedef EventLoop<NullMutex> NEventLoop;
+typedef EventLoop<std::mutex> TEventLoop;
+
+// from dasync.cc:
+TEventLoop & getSystemLoop();
+
+// Posix signal event watcher
+template <typename T_Mutex>
+class SignalWatcher : private dprivate::BaseSignalWatcher<T_Mutex>
+{
+ using BaseWatcher = dprivate::BaseWatcher;
+
+public:
+ using SigInfo_p = typename dprivate::BaseSignalWatcher<T_Mutex>::SigInfo_p;
+
+ // Register this watcher to watch the specified signal.
+ // If an attempt is made to register with more than one event loop at
+ // a time, behaviour is undefined.
+ inline void registerWatch(EventLoop<T_Mutex> *eloop, int signo)
+ {
+ BaseWatcher::init();
+ this->siginfo.set_signo(signo);
+ eloop->registerSignal(this, signo);
+ }
+
+ inline void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
+ {
+ eloop->deregister(this, this->siginfo.get_signo());
+ }
+
+ // virtual Rearm gotSignal(EventLoop<T_Mutex> *, int signo, SigInfo_p info) = 0;
+};
+
+// Posix file descriptor event watcher
+template <typename T_Mutex>
+class FdWatcher : private dprivate::BaseFdWatcher<T_Mutex>
+{
+ using BaseWatcher = dprivate::BaseWatcher;
+
+ protected:
+
+ // Set the types of event to watch. Only supported if LoopTraits::has_bidi_fd_watch
+ // is true; otherwise has unspecified behavior.
+ // Only safe to call from within the callback handler (gotEvent). Might not take
+ // effect until the current callback handler returns with REARM.
+ void setWatchFlags(int newFlags)
+ {
+ this->watch_flags = newFlags;
+ }
+
+ public:
+
+ // Register a file descriptor watcher with an event loop. Flags
+ // can be any combination of dasynq::IN_EVENTS / dasynq::OUT_EVENTS.
+ // Exactly one of IN_EVENTS/OUT_EVENTS must be specified if the event
+ // loop does not support bi-directional fd watchers (i.e. if
+ // ! LoopTraits::has_bidi_fd_watch).
+ //
+ // Mechanisms supporting dual watchers allow for two watchers for a
+ // single file descriptor (one watching read status and the other
+ // write status). Others mechanisms support only a single watcher
+ // per file descriptor. Adding a watcher beyond what is supported
+ // causes undefined behavior.
+ //
+ // Can fail with std::bad_alloc or std::system_error.
+ void registerWith(EventLoop<T_Mutex> *eloop, int fd, int flags)
+ {
+ BaseWatcher::init();
+ this->watch_fd = fd;
+ this->watch_flags = flags;
+ eloop->registerFd(this, fd, flags);
+ }
+
+ // Deregister a file descriptor watcher.
+ //
+ // If other threads may be polling the event loop, it is not safe to assume
+ // the watcher is unregistered until the watchRemoved() callback is issued
+ // (which will not occur until the event handler returns, if it is active).
+ // In a single threaded environment, it is safe to delete the watcher after
+ // calling this method as long as the handler (if it is active) accesses no
+ // internal state and returns Rearm::REMOVED.
+ // TODO: implement REMOVED, or correct above statement.
+ void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
+ {
+ eloop->deregister(this, this->watch_fd);
+ }
+
+ void setEnabled(EventLoop<T_Mutex> *eloop, bool enable) noexcept
+ {
+ std::lock_guard<T_Mutex> guard(eloop->getBaseLock());
+ eloop->setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable);
+ if (! enable) {
+ eloop->dequeueWatcher(this);
+ }
+ }
+
+ // virtual Rearm gotEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
+};
+
+// A Bi-directional file descriptor watcher with independent read- and write- channels.
+// This watcher type has two event notification methods which can both potentially be
+// active at the same time.
+template <typename T_Mutex>
+class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<T_Mutex>
+{
+ using BaseWatcher = dprivate::BaseWatcher;
+
+ void setWatchEnabled(EventLoop<T_Mutex> *eloop, bool in, bool b)
+ {
+ int events = in ? IN_EVENTS : OUT_EVENTS;
+
+ if (b) {
+ this->watch_flags |= events;
+ }
+ else {
+ this->watch_flags &= ~events;
+ }
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
+ eloop->setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b);
+ if (! b) {
+ eloop->dequeueWatcher(watcher);
+ }
+ }
+ else {
+ eloop->setFdEnabled_nolock(this, this->watch_fd,
+ (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT,
+ (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) != 0);
+ if (! b) {
+ dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
+ eloop->dequeueWatcher(watcher);
+ }
+ }
+ }
+
+ protected:
+
+ // TODO if a watch is disabled and currently queued, we should de-queue it.
+
+ void setInWatchEnabled(EventLoop<T_Mutex> *eloop, bool b) noexcept
+ {
+ eloop->getBaseLock().lock();
+ setWatchEnabled(eloop, true, b);
+ eloop->getBaseLock().unlock();
+ }
+
+ void setOutWatchEnabled(EventLoop<T_Mutex> *eloop, bool b) noexcept
+ {
+ eloop->getBaseLock().lock();
+ setWatchEnabled(eloop, false, b);
+ eloop->getBaseLock().unlock();
+ }
+
+ // Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly.
+ //
+ // Concurrency: this method can only be called if
+ // - it does not enable a watcher that might currently be active
+ /// - unless the event loop will not be polled while the watcher is active.
+ // (i.e. it is ok to call setWatchFlags from within the readReady/writeReady handlers if no other
+ // thread will poll the event loop; it is ok to *dis*able a watcher that might be active).
+ void setWatchFlags(EventLoop<T_Mutex> * eloop, int newFlags)
+ {
+ std::lock_guard<T_Mutex> guard(eloop->getBaseLock());
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ setWatchEnabled(eloop, true, (newFlags & IN_EVENTS) != 0);
+ setWatchEnabled(eloop, false, (newFlags & OUT_EVENTS) != 0);
+ }
+ else {
+ this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags;
+ eloop->setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
+ }
+ }
+
+ public:
+
+ // Register a bi-direction file descriptor watcher with an event loop. Flags
+ // can be any combination of dasynq::IN_EVENTS / dasynq::OUT_EVENTS.
+ //
+ // Can fail with std::bad_alloc or std::system_error.
+ void registerWith(EventLoop<T_Mutex> *eloop, int fd, int flags)
+ {
+ BaseWatcher::init();
+ this->outWatcher.BaseWatcher::init();
+ this->watch_fd = fd;
+ this->watch_flags = flags | dprivate::multi_watch;
+ eloop->registerFd(this, fd, flags);
+ }
+
+ // Deregister a bi-direction file descriptor watcher.
+ //
+ // If other threads may be polling the event loop, it is not safe to assume
+ // the watcher is unregistered until the watchRemoved() callback is issued
+ // (which will not occur until the event handler returns, if it is active).
+ // In a single threaded environment, it is safe to delete the watcher after
+ // calling this method as long as the handler (if it is active) accesses no
+ // internal state and returns Rearm::REMOVED.
+ // TODO: implement REMOVED, or correct above statement.
+ void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
+ {
+ eloop->deregister(this, this->watch_fd);
+ }
+
+ // Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
+ // Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
+};
+
+// Child process event watcher
+template <typename T_Mutex>
+class ChildProcWatcher : private dprivate::BaseChildWatcher<T_Mutex>
+{
+ using BaseWatcher = dprivate::BaseWatcher;
+
+ public:
+ // Reserve resources for a child watcher with the given event loop.
+ // Reservation can fail with std::bad_alloc.
+ void reserveWith(EventLoop<T_Mutex> *eloop)
+ {
+ eloop->reserveChildWatch();
+ }
+
+ // Register a watcher for the given child process with an event loop.
+ // Registration can fail with std::bad_alloc.
+ void registerWith(EventLoop<T_Mutex> *eloop, pid_t child)
+ {
+ BaseWatcher::init();
+ this->watch_pid = child;
+ eloop->registerChild(this, child);
+ }
+
+ // Register a watcher for the given child process with an event loop,
+ // after having reserved resources previously (using reserveWith).
+ // Registration cannot fail.
+ void registerReserved(EventLoop<T_Mutex> *eloop, pid_t child) noexcept
+ {
+ BaseWatcher::init();
+ eloop->registerReservedChild(this, child);
+ }
+
+ // virtual void gotTermStat(EventLoop<T_Mutex> *, pid_t child, int status) = 0;
+};
+
+} // namespace dasynq
+
+#endif
#include <sys/syslog.h>
#include <sys/uio.h>
-#include "dasync.h"
+#include "dasynq.h"
#include "service.h"
#include "dinit-log.h"
static ServiceSet *service_set = nullptr; // Reference to service set
namespace {
-class BufferedLogStream : public PosixFdWatcher<NullMutex>
+class BufferedLogStream : public FdWatcher<NullMutex>
{
private:
// Try to flush any messages that are currently buffered. (Console is non-blocking
// so it will fail gracefully).
- if (gotEvent(&eventLoop, fd, out_events) == Rearm::DISARM) {
+ if (gotEvent(&eventLoop, fd, OUT_EVENTS) == Rearm::DISARM) {
// Console has already been released at this point.
setEnabled(&eventLoop, false);
}
void init_log(ServiceSet *sset)
{
service_set = sset;
- log_stream[DLOG_CONS].registerWith(&eventLoop, STDOUT_FILENO, out_events); // TODO register in disabled state
+ log_stream[DLOG_CONS].registerWith(&eventLoop, STDOUT_FILENO, OUT_EVENTS); // TODO register in disabled state
enable_console_log(true);
}
void setup_main_log(int fd)
{
log_stream[DLOG_MAIN].init(fd);
- log_stream[DLOG_MAIN].registerWith(&eventLoop, fd, out_events);
+ log_stream[DLOG_MAIN].registerWith(&eventLoop, fd, OUT_EVENTS);
}
bool is_log_flushed() noexcept
#include <fcntl.h>
#include <pwd.h>
-#include "dasync.h"
+#include "dasynq.h"
#include "service.h"
#include "control.h"
#include "dinit-log.h"
*/
-using namespace dasync;
+using namespace dasynq;
using EventLoop_t = EventLoop<NullMutex>;
EventLoop_t eventLoop = EventLoop_t();
static void control_socket_cb(EventLoop_t *loop, int fd);
-class ControlSocketWatcher : public PosixFdWatcher<NullMutex>
+class ControlSocketWatcher : public FdWatcher<NullMutex>
{
Rearm gotEvent(EventLoop_t * loop, int fd, int flags)
{
void registerWith(EventLoop_t * loop, int fd, int flags)
{
this->fd = fd;
- PosixFdWatcher<NullMutex>::registerWith(loop, fd, flags);
+ FdWatcher<NullMutex>::registerWith(loop, fd, flags);
}
};
namespace {
- class CallbackSignalHandler : public PosixSignalWatcher<NullMutex>
+ class CallbackSignalHandler : public SignalWatcher<NullMutex>
{
public:
typedef void (*cb_func_t)(EventLoop_t *);
}
};
- class ControlSocketWatcher : public PosixFdWatcher<NullMutex>
+ class ControlSocketWatcher : public FdWatcher<NullMutex>
{
Rearm gotEvent(EventLoop_t * loop, int fd, int flags)
{
}
control_socket_open = true;
- control_socket_io.registerWith(&eventLoop, sockfd, in_events);
+ control_socket_io.registerWith(&eventLoop, sockfd, IN_EVENTS);
}
}
// Listen for status
// TODO should set this up earlier so we can handle failure case (exception)
- child_status_listener.registerWith(&eventLoop, pipefd[0], in_events);
+ child_status_listener.registerWith(&eventLoop, pipefd[0], IN_EVENTS);
// Add a process listener so we can detect when the
// service stops
#include <csignal>
#include <unordered_set>
-#include "dasync.h"
+#include "dasynq.h"
#include "control.h"
#include "service-listener.h"
return r;
}
-class ServiceChildWatcher : public PosixChildWatcher<NullMutex>
+class ServiceChildWatcher : public ChildProcWatcher<NullMutex>
{
public:
// TODO resolve clunkiness of storing this field
ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { }
};
-class ServiceIoWatcher : public PosixFdWatcher<NullMutex>
+class ServiceIoWatcher : public FdWatcher<NullMutex>
{
public:
// TODO resolve clunkiness of storing these fields
void registerWith(EventLoop_t *loop, int fd, int flags)
{
this->fd = fd;
- PosixFdWatcher<NullMutex>::registerWith(loop, fd, flags);
+ FdWatcher<NullMutex>::registerWith(loop, fd, flags);
}
};