From: Davin McCall Date: Thu, 16 Jun 2016 18:31:01 +0000 (+0100) Subject: Pull in latest Dasynq, which means a massive amount of renaming. X-Git-Tag: v0.03~14 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=4374867afe6dea191dbdfd6e7b051f182cba8425;p=oweals%2Fdinit.git Pull in latest Dasynq, which means a massive amount of renaming. --- diff --git a/src/Makefile b/src/Makefile index 2159cdc..b671fd5 100644 --- a/src/Makefile +++ b/src/Makefile @@ -21,7 +21,7 @@ dinit-reboot: dinit-reboot.o $(CXX) -o dinit-reboot dinit-reboot.o $(objects): %.o: %.cc service.h dinit-log.h control.h control-cmds.h cpbuffer.h - $(CXX) $(CXXOPTS) -Idasync -c $< -o $@ + $(CXX) $(CXXOPTS) -Idasynq -c $< -o $@ #install: all diff --git a/src/control.cc b/src/control.cc index f716f39..20478cf 100644 --- a/src/control.cc +++ b/src/control.cc @@ -51,7 +51,7 @@ bool ControlConn::processPacket() char outbuf[] = { DINIT_RP_BADREQ }; if (! queuePacket(outbuf, 1)) return false; bad_conn_close = true; - iob.setWatchFlags(out_events); + iob.setWatchFlags(OUT_EVENTS); } return true; } @@ -75,7 +75,7 @@ bool ControlConn::processFindLoad(int pktType) char badreqRep[] = { DINIT_RP_BADREQ }; if (! queuePacket(badreqRep, 1)) return false; bad_conn_close = true; - iob.setWatchFlags(out_events); + iob.setWatchFlags(OUT_EVENTS); return true; } @@ -151,7 +151,7 @@ bool ControlConn::processStartStop(int pktType) char badreqRep[] = { DINIT_RP_BADREQ }; if (! queuePacket(badreqRep, 1)) return false; bad_conn_close = true; - iob.setWatchFlags(out_events); + iob.setWatchFlags(OUT_EVENTS); return true; } else { @@ -225,7 +225,7 @@ bool ControlConn::processUnpinService() char badreqRep[] = { DINIT_RP_BADREQ }; if (! queuePacket(badreqRep, 1)) return false; bad_conn_close = true; - iob.setWatchFlags(out_events); + iob.setWatchFlags(OUT_EVENTS); return true; } else { @@ -268,7 +268,7 @@ ControlConn::handle_t ControlConn::allocateServiceHandle(ServiceRecord *record) bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept { - int in_flag = bad_conn_close ? 0 : in_events; + int in_flag = bad_conn_close ? 0 : IN_EVENTS; bool was_empty = outbuf.empty(); // If the queue is empty, we can try to write the packet out now rather than queueing it. @@ -299,7 +299,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept // Create a vector out of the (remaining part of the) packet: try { outbuf.emplace_back(pkt, pkt + size); - iob.setWatchFlags(in_flag | out_events); + iob.setWatchFlags(in_flag | OUT_EVENTS); return true; } catch (std::bad_alloc &baexc) { @@ -313,7 +313,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept return false; } else { - iob.setWatchFlags(out_events); + iob.setWatchFlags(OUT_EVENTS); return true; } } @@ -323,7 +323,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept // make them extraordinary difficult to combine into a single method. bool ControlConn::queuePacket(std::vector &&pkt) noexcept { - int in_flag = bad_conn_close ? 0 : in_events; + int in_flag = bad_conn_close ? 0 : IN_EVENTS; bool was_empty = outbuf.empty(); if (was_empty) { @@ -352,7 +352,7 @@ bool ControlConn::queuePacket(std::vector &&pkt) noexcept try { outbuf.emplace_back(pkt); - iob.setWatchFlags(in_flag | out_events); + iob.setWatchFlags(in_flag | OUT_EVENTS); return true; } catch (std::bad_alloc &baexc) { @@ -366,7 +366,7 @@ bool ControlConn::queuePacket(std::vector &&pkt) noexcept return false; } else { - iob.setWatchFlags(out_events); + iob.setWatchFlags(OUT_EVENTS); return true; } } @@ -412,11 +412,11 @@ bool ControlConn::dataReady() noexcept // TODO log error? // TODO error response? bad_conn_close = true; - iob.setWatchFlags(out_events); + iob.setWatchFlags(OUT_EVENTS); } else { - int out_flags = (bad_conn_close || !outbuf.empty()) ? out_events : 0; - iob.setWatchFlags(in_events | out_flags); + int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0; + iob.setWatchFlags(IN_EVENTS | out_flags); } return false; @@ -458,7 +458,7 @@ bool ControlConn::sendData() noexcept outpkt_index = 0; if (outbuf.empty() && ! oom_close) { if (! bad_conn_close) { - iob.setWatchFlags(in_events); + iob.setWatchFlags(IN_EVENTS); } else { return true; diff --git a/src/control.h b/src/control.h index 5f420b2..f76ee28 100644 --- a/src/control.h +++ b/src/control.h @@ -9,7 +9,7 @@ #include -#include "dasync.h" +#include "dasynq.h" #include "dinit-log.h" #include "control-cmds.h" @@ -18,14 +18,14 @@ // Control connection for dinit -using namespace dasync; +using namespace dasynq; using EventLoop_t = EventLoop; class ControlConn; class ControlConnWatcher; // forward-declaration of callback: -static dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); +static Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); // Pointer to the control connection that is listening for rollback completion extern ControlConn * rollback_handler_conn; @@ -47,18 +47,18 @@ extern int active_control_conns; class ServiceSet; class ServiceRecord; -class ControlConnWatcher : public PosixBidiFdWatcher +class ControlConnWatcher : public BidiFdWatcher { inline Rearm receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept; Rearm readReady(EventLoop_t * loop, int fd) noexcept override { - return receiveEvent(loop, fd, in_events); + return receiveEvent(loop, fd, IN_EVENTS); } Rearm writeReady(EventLoop_t * loop, int fd) noexcept override { - return receiveEvent(loop, fd, out_events); + return receiveEvent(loop, fd, OUT_EVENTS); } public: @@ -67,14 +67,14 @@ class ControlConnWatcher : public PosixBidiFdWatcher void setWatchFlags(int flags) { - PosixBidiFdWatcher::setWatchFlags(eventLoop, flags); + BidiFdWatcher::setWatchFlags(eventLoop, flags); } void registerWith(EventLoop_t *loop, int fd, int flags) { this->fd = fd; this->eventLoop = loop; - PosixBidiFdWatcher::registerWith(loop, fd, flags); + BidiFdWatcher::registerWith(loop, fd, flags); } }; @@ -86,7 +86,7 @@ inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t * loop, int fd, int fl class ControlConn : private ServiceListener { - friend dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); + friend Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); ControlConnWatcher iob; EventLoop_t *loop; @@ -169,7 +169,7 @@ class ControlConn : private ServiceListener { bad_conn_close = true; oom_close = true; - iob.setWatchFlags(out_events); + iob.setWatchFlags(OUT_EVENTS); } // Process service event broadcast. @@ -206,7 +206,7 @@ class ControlConn : private ServiceListener public: ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0) { - iob.registerWith(loop, fd, in_events); + iob.registerWith(loop, fd, IN_EVENTS); active_control_conns++; } @@ -216,19 +216,17 @@ class ControlConn : private ServiceListener }; -static dasync::Rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents) +static Rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents) { - using Rearm = dasync::Rearm; - char * cc_addr = (reinterpret_cast(watcher)) - offsetof(ControlConn, iob); ControlConn *conn = reinterpret_cast(cc_addr); - if (revents & in_events) { + if (revents & IN_EVENTS) { if (conn->dataReady()) { delete conn; return Rearm::REMOVED; } } - if (revents & out_events) { + if (revents & OUT_EVENTS) { if (conn->sendData()) { delete conn; return Rearm::REMOVED; diff --git a/src/dasync/dasync-childproc.h b/src/dasync/dasync-childproc.h deleted file mode 100644 index 990fbb1..0000000 --- a/src/dasync/dasync-childproc.h +++ /dev/null @@ -1,141 +0,0 @@ -namespace dasync { - -// Map of pid_t to void *, with possibility of reserving entries so that mappings can -// be later added with no danger of allocator exhaustion (bad_alloc). -class pid_map -{ - using pair = std::pair; - std::unordered_map base_map; - std::vector backup_vector; - - // Number of entries in backup_vector that are actually in use (as opposed - // to simply reserved): - int backup_size = 0; - - public: - using entry = std::pair; - - entry get(pid_t key) noexcept - { - auto it = base_map.find(key); - if (it == base_map.end()) { - // Not in map; look in vector - for (int i = 0; i < backup_size; i++) { - if (backup_vector[i].first == key) { - return entry(true, backup_vector[i].second); - } - } - - return entry(false, nullptr); - } - - return entry(true, it->second); - } - - entry erase(pid_t key) noexcept - { - auto iter = base_map.find(key); - if (iter != base_map.end()) { - entry r(true, iter->second); - base_map.erase(iter); - return r; - } - for (int i = 0; i < backup_size; i++) { - if (backup_vector[i].first == key) { - entry r(true, backup_vector[i].second); - backup_vector.erase(backup_vector.begin() + i); - return r; - } - } - return entry(false, nullptr); - } - - // Throws bad_alloc on reservation failure - void reserve() - { - backup_vector.resize(backup_vector.size() + 1); - } - - void add(pid_t key, void *val) // throws std::bad_alloc - { - base_map[key] = val; - } - - void add_from_reserve(pid_t key, void *val) noexcept - { - try { - base_map[key] = val; - backup_vector.resize(backup_vector.size() - 1); - } - catch (std::bad_alloc &) { - // We couldn't add into the map, use the reserve: - backup_vector[backup_size++] = pair(key, val); - } - } -}; - -namespace { - void sigchld_handler(int signum) - { - // If SIGCHLD has no handler (is ignored), SIGCHLD signals will - // not be queued for terminated child processes. (On Linux, the - // default disposition for SIGCHLD is to be ignored but *not* have - // this behavior, which seems inconsistent. Setting a handler doesn't - // hurt in any case). - } -} - -template class ChildProcEvents : public Base -{ - private: - pid_map child_waiters; - - using SigInfo = typename Base::SigInfo; - - protected: - void receiveSignal(SigInfo &siginfo, void *userdata) - { - if (siginfo.get_signo() == SIGCHLD) { - int status; - pid_t child; - while ((child = waitpid(-1, &status, WNOHANG)) > 0) { - pid_map::entry ent = child_waiters.erase(child); - if (ent.first) { - Base::receiveChildStat(child, status, ent.second); - } - } - } - else { - Base::receiveSignal(siginfo, userdata); - } - } - - public: - void reserveChildWatch() - { - child_waiters.reserve(); - } - - void addChildWatch(pid_t child, void *val) - { - child_waiters.add(child, val); - } - - void addReservedChildWatch(pid_t child, void *val) noexcept - { - child_waiters.add_from_reserve(child, val); - } - - template void init(T *loop_mech) - { - loop_mech->addSignalWatch(SIGCHLD, nullptr); - struct sigaction chld_action; - chld_action.sa_handler = sigchld_handler; - sigemptyset(&chld_action.sa_mask); - chld_action.sa_flags = 0; - sigaction(SIGCHLD, &chld_action, nullptr); - } -}; - - -} // end namespace diff --git a/src/dasync/dasync-epoll.h b/src/dasync/dasync-epoll.h deleted file mode 100644 index 05cf5aa..0000000 --- a/src/dasync/dasync-epoll.h +++ /dev/null @@ -1,327 +0,0 @@ -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include -#include - -namespace dasync { - -template class EpollLoop; - -class EpollTraits -{ - template friend class EpollLoop; - - public: - - class SigInfo - { - template friend class EpollLoop; - - struct signalfd_siginfo info; - - public: - int get_signo() { return info.ssi_signo; } - int get_sicode() { return info.ssi_code; } - int get_siint() { return info.ssi_int; } - int get_ssiptr() { return info.ssi_ptr; } - int get_ssiaddr() { return info.ssi_addr; } - - void set_signo(int signo) { info.ssi_signo = signo; } - }; - - class FD_r; - - // File descriptor optional storage. If the mechanism can return the file descriptor, this - // class will be empty, otherwise it can hold a file descriptor. - class FD_s { - friend class FD_r; - - // Epoll doesn't return the file descriptor (it can, but it can't return both file - // descriptor and user data). - int fd; - }; - - // File descriptor reference (passed to event callback). If the mechanism can return the - // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor - // must be stored in an FD_s instance. - class FD_r { - public: - int getFd(FD_s ss) - { - return ss.fd; - } - }; - - const static bool has_bidi_fd_watch = true; - const static bool has_separate_rw_fd_watches = false; -}; - - -template class EpollLoop : public Base -{ - int epfd; // epoll fd - int sigfd; // signalfd fd; -1 if not initialised - sigset_t sigmask; - - std::unordered_map sigdataMap; - - // Base contains: - // lock - a lock that can be used to protect internal structure. - // receive*() methods will be called with lock held. - // receiveSignal(SigInfo &, user *) noexcept - // receiveFdEvent(FD_r, user *, int flags) noexcept - - using SigInfo = EpollTraits::SigInfo; - using FD_r = typename EpollTraits::FD_r; - - void processEvents(epoll_event *events, int r) - { - std::lock_guard guard(Base::lock); - - for (int i = 0; i < r; i++) { - void * ptr = events[i].data.ptr; - - if (ptr == &sigfd) { - // Signal - SigInfo siginfo; - while (true) { - int r = read(sigfd, &siginfo.info, sizeof(siginfo.info)); - if (r == -1) break; - if (siginfo.get_signo() != SIGCHLD) { - // TODO remove the special exception for SIGCHLD? - sigdelset(&sigmask, siginfo.get_signo()); - } - auto iter = sigdataMap.find(siginfo.get_signo()); - if (iter != sigdataMap.end()) { - void *userdata = (*iter).second; - Base::receiveSignal(siginfo, userdata); - } - } - signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); - } - else { - int flags = 0; - (events[i].events & EPOLLIN) && (flags |= in_events); - (events[i].events & EPOLLHUP) && (flags |= in_events); - (events[i].events & EPOLLOUT) && (flags |= out_events); - (events[i].events & EPOLLERR) && (flags |= err_events); - Base::receiveFdEvent(*this, FD_r(), ptr, flags); - } - } - } - - public: - - /** - * EpollLoop constructor. - * - * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. - */ - EpollLoop() : sigfd(-1) - { - epfd = epoll_create1(EPOLL_CLOEXEC); - if (epfd == -1) { - throw std::system_error(errno, std::system_category()); - } - sigemptyset(&sigmask); - Base::init(this); - } - - ~EpollLoop() - { - close(epfd); - if (sigfd != -1) { - close(sigfd); - } - } - - // flags: in_events | out_events - void addFdWatch(int fd, void *userdata, int flags) - { - struct epoll_event epevent; - // epevent.data.fd = fd; - epevent.data.ptr = userdata; - epevent.events = 0; - - if (flags & one_shot) { - epevent.events = EPOLLONESHOT; - } - if (flags & in_events) { - epevent.events |= EPOLLIN; - } - if (flags & out_events) { - epevent.events |= EPOLLOUT; - } - - if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) { - throw new std::system_error(errno, std::system_category()); - } - } - - void removeFdWatch(int fd, int flags) noexcept - { - epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr); - } - - void removeFdWatch_nolock(int fd, int flags) noexcept - { - removeFdWatch(fd, flags); - } - - // Note this will *replace* the old flags with the new, that is, - // it can enable *or disable* read/write events. - void enableFdWatch(int fd, void *userdata, int flags) noexcept - { - struct epoll_event epevent; - // epevent.data.fd = fd; - epevent.data.ptr = userdata; - epevent.events = 0; - - if (flags & one_shot) { - epevent.events = EPOLLONESHOT; - } - if (flags & in_events) { - epevent.events |= EPOLLIN; - } - if (flags & out_events) { - epevent.events |= EPOLLOUT; - } - - if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { - // Shouldn't be able to fail - // throw new std::system_error(errno, std::system_category()); - } - } - - void enableFdWatch_nolock(int fd, void *userdata, int flags) - { - enableFdWatch(fd, userdata, flags); - } - - void disableFdWatch(int fd, int flags) noexcept - { - struct epoll_event epevent; - // epevent.data.fd = fd; - epevent.data.ptr = nullptr; - epevent.events = 0; - - // Epoll documentation says that hangup will still be reported, need to check - // whether this is really the case. Suspect it is really only the case if - // EPOLLIN is set. - if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { - // Let's assume that this can't fail. - // throw new std::system_error(errno, std::system_category()); - } - } - - void disableFdWatch_nolock(int fd, int flags) noexcept - { - disableFdWatch(fd, flags); - } - - // Note signal should be masked before call. - void addSignalWatch(int signo, void *userdata) - { - std::lock_guard guard(Base::lock); - - sigdataMap[signo] = userdata; - - // Modify the signal fd to watch the new signal - bool was_no_sigfd = (sigfd == -1); - sigaddset(&sigmask, signo); - sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); - if (sigfd == -1) { - throw new std::system_error(errno, std::system_category()); - } - - if (was_no_sigfd) { - // Add the signalfd to the epoll set. - struct epoll_event epevent; - epevent.data.ptr = &sigfd; - epevent.events = EPOLLIN; - // No need for EPOLLONESHOT - we can pull the signals out - // as we see them. - if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) { - close(sigfd); - throw new std::system_error(errno, std::system_category()); - } - } - } - - // Note, called with lock held: - void rearmSignalWatch_nolock(int signo) noexcept - { - sigaddset(&sigmask, signo); - signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); - } - - void removeSignalWatch_nolock(int signo) noexcept - { - sigdelset(&sigmask, signo); - signalfd(sigfd, &sigmask, 0); - } - - void removeSignalWatch(int signo) noexcept - { - std::lock_guard guard(Base::lock); - removeSignalWatch_nolock(signo); - } - - // If events are pending, process an unspecified number of them. - // If no events are pending, wait until one event is received and - // process this event (and possibly any other events received - // simultaneously). - // If processing an event removes a watch, there is a possibility - // that the watched event will still be reported (if it has - // occurred) before pullEvents() returns. - // - // do_wait - if false, returns immediately if no events are - // pending. - void pullEvents(bool do_wait) - { - epoll_event events[16]; - int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0); - if (r == -1 || r == 0) { - // signal or no events - return; - } - - processEvents(events, r); - } - - // If events are pending, process one of them. - // If no events are pending, wait until one event is received and - // process this event. - // - // do_wait - if false, returns immediately if no events are - // pending. - void pullOneEvent(bool do_wait) - { - epoll_event events[1]; - int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0); - if (r == -1 || r == 0) { - // signal or no events - return; - } - - processEvents(events, r); - } - - // Interrupt any current poll operation (pullEvents/pullOneEvent), causing - // it to to return immediately. - void interruptWait() - { - // TODO - } -}; - -} // end namespace diff --git a/src/dasync/dasync-flags.h b/src/dasync/dasync-flags.h deleted file mode 100644 index 441c34a..0000000 --- a/src/dasync/dasync-flags.h +++ /dev/null @@ -1,13 +0,0 @@ -namespace dasync { - -// Event type bits -constexpr unsigned int in_events = 1; -constexpr unsigned int out_events = 2; -constexpr unsigned int err_events = 4; - -constexpr unsigned int one_shot = 8; - -// Masks: -constexpr unsigned int IO_EVENTS = in_events | out_events; - -} diff --git a/src/dasync/dasync-kqueue.h b/src/dasync/dasync-kqueue.h deleted file mode 100644 index fb6d9da..0000000 --- a/src/dasync/dasync-kqueue.h +++ /dev/null @@ -1,371 +0,0 @@ -#include -#include -#include -#include -#include - -#ifdef __OpenBSD__ -#include // for __thrsigdivert aka sigtimedwait -#include -extern "C" { - int __thrsigdivert(sigset_t set, siginfo_t *info, const struct timespec * timeout); -} -#endif - -#include -#include -#include -#include - -#include -#include - -namespace dasync { - -template class KqueueLoop; - -class KqueueTraits -{ - template friend class KqueueLoop; - - public: - - class SigInfo - { - template friend class KqueueLoop; - - siginfo_t info; - - public: - int get_signo() { return info.si_signo; } - int get_sicode() { return info.si_code; } - char * get_ssiaddr() { return info.si_addr; } - - void set_signo(int signo) { info.si_signo = signo; } - }; - - class FD_r; - - // File descriptor optional storage. If the mechanism can return the file descriptor, this - // class will be empty, otherwise it can hold a file descriptor. - class FD_s { - // Epoll doesn't return the file descriptor (it can, but it can't return both file - // descriptor and user data). - // TODO make true empty. - }; - - // File descriptor reference (passed to event callback). If the mechanism can return the - // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor - // must be stored in an FD_s instance. - class FD_r { - int fd; - public: - int getFd(FD_s ss) - { - return fd; - } - FD_r(int nfd) : fd(nfd) - { - } - }; - - const static bool has_separate_rw_fd_watches = true; -}; - -#if defined(__OpenBSD__) -// OpenBSD has no sigtimedwait (or sigwaitinfo) but does have "__thrsigdivert", which is -// essentially an incomplete version of the same thing. Discussion with OpenBSD developer -// Ted Unangst suggested that the siginfo_t structure returned might not always have all -// fields set correctly. Furthermore there is a bug such that specifying a zero timeout (or -// indeed any timeout less than a tick) results in NO timeout. -static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct timespec *timeout) -{ - // We know that we're only called with a timeout of 0 (which doesn't work properly) and - // that we safely overwrite the timeout. So, we set tv_nsec to an invalid value, which - // will cause EINVAL to be returned, but will still pick up any pending signals *first*. - timeout->tv_nsec = 1000000001; - return __thrsigdivert(*ssp, info, timeout); -} -#endif - -template class KqueueLoop : public Base -{ - int kqfd; // kqueue fd - sigset_t sigmask; // enabled signal watch mask - - // Map of signal number to user data pointer. If kqueue had been better thought-through, - // we shouldn't need this. Although we can associate user data with an EVFILT_SIGNAL kqueue - // filter, the problem is that the kqueue signal report *coexists* with the regular signal - // delivery mechanism without having any connection to it. Whereas regular signals can be - // queued (especially "realtime" signals, via sigqueue()), kqueue just maintains a counter - // of delivery attempts and clears this when we read the event. What this means is that - // kqueue won't necessarily tell us if signals are pending, in the case that: - // 1) it already reported the attempted signal delivery and - // 2) more than one of the same signal was pending at that time and - // 3) no more deliveries of the same signal have been attempted in the meantime. - // Of course, if kqueue doesn't report the signal, then it doesn't give us the data associated - // with the event, so we need to maintain that separately too: - std::unordered_map sigdataMap; - - // Base contains: - // lock - a lock that can be used to protect internal structure. - // receive*() methods will be called with lock held. - // receiveSignal(SigInfo &, user *) noexcept - // receiveFdEvent(FD_r, user *, int flags) noexcept - - using SigInfo = KqueueTraits::SigInfo; - using FD_r = typename KqueueTraits::FD_r; - - void processEvents(struct kevent *events, int r) - { - std::lock_guard guard(Base::lock); - - for (int i = 0; i < r; i++) { - if (events[i].filter == EVFILT_SIGNAL) { - SigInfo siginfo; - sigset_t sset; - sigemptyset(&sset); - sigaddset(&sset, events[i].ident); - struct timespec timeout; - timeout.tv_sec = 0; - timeout.tv_nsec = 0; - if (sigtimedwait(&sset, &siginfo.info, &timeout) > 0) { - Base::receiveSignal(siginfo, (void *)events[i].udata); - } - - if (events[i].ident != SIGCHLD) { - sigdelset(&sigmask, events[i].ident); - events[i].flags = EV_DISABLE; - } - else { - // TODO can we remove this SIGCHLD hack? - events[i].flags = EV_ENABLE; - } - } - else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) { - int flags = events[i].filter == EVFILT_READ ? in_events : out_events; - Base::receiveFdEvent(*this, FD_r(events[i].ident), events[i].udata, flags); - events[i].flags = EV_DISABLE | EV_CLEAR; - // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for - // another connection). - } - else { - events[i].flags = EV_DISABLE; - } - } - - // Now we disable all received events, to simulate EV_DISPATCH: - kevent(kqfd, events, r, nullptr, 0, nullptr); - } - - public: - - /** - * KqueueLoop constructor. - * - * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. - */ - KqueueLoop() - { - kqfd = kqueue(); - if (kqfd == -1) { - throw std::system_error(errno, std::system_category()); - } - sigemptyset(&sigmask); - Base::init(this); - } - - ~KqueueLoop() - { - close(kqfd); - } - - void setFilterEnabled(short filterType, uintptr_t ident, bool enable) - { - struct kevent kev; - EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, 0, 0, 0); - kevent(kqfd, &kev, 1, nullptr, 0, nullptr); - } - - void removeFilter(short filterType, uintptr_t ident) - { - struct kevent kev; - EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0); - kevent(kqfd, &kev, 1, nullptr, 0, nullptr); - } - - // flags: in_events | out_events - void addFdWatch(int fd, void *userdata, int flags) - { - // TODO kqueue doesn't support EVFILE_WRITE on file fd's :/ - - short filter = (flags & in_events) ? EVFILT_READ : EVFILT_WRITE; - - struct kevent kev; - EV_SET(&kev, fd, filter, EV_ADD, 0, 0, userdata); - if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) { - throw new std::system_error(errno, std::system_category()); - } - } - - void removeFdWatch(int fd, int flags) - { - removeFilter((flags & in_events) ? EVFILT_READ : EVFILT_WRITE, fd); - } - - void removeFdWatch_nolock(int fd, int flags) - { - removeFdWatch(fd, flags); - } - - void enableFdWatch(int fd, void *userdata, int flags) - { - setFilterEnabled((flags & in_events) ? EVFILT_READ : EVFILT_WRITE, fd, true); - } - - void enableFdWatch_nolock(int fd, void *userdata, int flags) - { - enableFdWatch(fd, userdata, flags); - } - - void disableFdWatch(int fd, int flags) - { - setFilterEnabled((flags & in_events) ? EVFILT_READ : EVFILT_WRITE, fd, false); - } - - void disableFdWatch_nolock(int fd, int flags) - { - disableFdWatch(fd, flags); - } - - // Note signal should be masked before call. - void addSignalWatch(int signo, void *userdata) - { - std::lock_guard guard(Base::lock); - - sigdataMap[signo] = userdata; - sigaddset(&sigmask, signo); - - struct kevent evt; - EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD, 0, 0, userdata); - // TODO use EV_DISPATCH if available (not on OpenBSD) - - if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) { - throw new std::system_error(errno, std::system_category()); - } - } - - // Note, called with lock held: - void rearmSignalWatch_nolock(int signo) noexcept - { - sigaddset(&sigmask, signo); - - struct kevent evt; - EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ENABLE, 0, 0, 0); - // TODO use EV_DISPATCH if available (not on OpenBSD) - - kevent(kqfd, &evt, 1, nullptr, 0, nullptr); - } - - void removeSignalWatch_nolock(int signo) noexcept - { - sigdelset(&sigmask, signo); - - struct kevent evt; - EV_SET(&evt, signo, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0); - - kevent(kqfd, &evt, 1, nullptr, 0, nullptr); - } - - void removeSignalWatch(int signo) noexcept - { - std::lock_guard guard(Base::lock); - removeSignalWatch_nolock(signo); - } - - // If events are pending, process an unspecified number of them. - // If no events are pending, wait until one event is received and - // process this event (and possibly any other events received - // simultaneously). - // If processing an event removes a watch, there is a possibility - // that the watched event will still be reported (if it has - // occurred) before pullEvents() returns. - // - // do_wait - if false, returns immediately if no events are - // pending. - void pullEvents(bool do_wait) - { - // We actually need to check pending signals, since - // kqueue can count signals as they are delivered but the count is - // cleared when we poll the kqueue, meaning that signals might still - // be pending if they were queued multiple times at the last poll. - - // TODO we should only poll for signals that *have* been reported - // as being raised more than once prior via kevent, rather than all - // signals that have been registered - in many cases that will allow - // us to skip the sigtimedwait call altogether. - - { - std::lock_guard guard(Base::lock); - struct timespec timeout; - timeout.tv_sec = 0; - timeout.tv_nsec = 0; - SigInfo siginfo; - int rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout); - while (rsigno > 0) { - // TODO avoid this hack for SIGCHLD somehow - if (rsigno != SIGCHLD) { - sigdelset(&sigmask, rsigno); - // TODO accumulate and disable multiple filters with a single kevents call - // rather than disabling each individually - setFilterEnabled(EVFILT_SIGNAL, rsigno, false); - } - Base::receiveSignal(siginfo, sigdataMap[rsigno]); - rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout); - } - } - - struct kevent events[16]; - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = 0; - int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts); - if (r == -1 || r == 0) { - // signal or no events - return; - } - - processEvents(events, r); - } - - // If events are pending, process one of them. - // If no events are pending, wait until one event is received and - // process this event. - // - // do_wait - if false, returns immediately if no events are - // pending. - void pullOneEvent(bool do_wait) - { - // TODO must check for pending signals as per pullEvents() - struct kevent events[1]; - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = 0; - int r = kevent(kqfd, nullptr, 0, events, 1, do_wait ? nullptr : &ts); - if (r == -1 || r == 0) { - // signal or no events - return; - } - - processEvents(events, r); - } - - // Interrupt any current poll operation (pullEvents/pullOneEvent), causing - // it to to return immediately. - void interruptWait() - { - // TODO - } -}; - -} // end namespace diff --git a/src/dasync/dasync.h b/src/dasync/dasync.h deleted file mode 100644 index 3602f01..0000000 --- a/src/dasync/dasync.h +++ /dev/null @@ -1,1213 +0,0 @@ -#ifndef DASYNC_H_INCLUDED -#define DASYNC_H_INCLUDED - -#if defined(__OpenBSD__) -#define HAVE_KQUEUE 1 -#endif - -#if defined(__linux__) -#define HAVE_EPOLL 1 -#endif - -#include "dasync-flags.h" - -#if defined(HAVE_KQUEUE) -#include "dasync-kqueue.h" -#include "dasync-childproc.h" -namespace dasync { - template using Loop = KqueueLoop; - using LoopTraits = KqueueTraits; -} -#elif defined(HAVE_EPOLL) -#include "dasync-epoll.h" -#include "dasync-childproc.h" -namespace dasync { - template using Loop = EpollLoop; - using LoopTraits = EpollTraits; -} -#endif -#include -#include -#include -#include - -#include "dmutex.h" - - - -// TODO consider using atomic variables instead of explicit locking where appropriate - -// Allow optimisation of empty classes by including this in the body: -// May be included as the last entry for a class which is only -// _potentially_ empty. - -/* -#ifdef __GNUC__ -#ifdef __clang__ -#define EMPTY_BODY private: char empty_fill[0]; -#else -#define EMPTY_BODY private: char empty_fill[0]; -#endif -#else -#define EMPTY_BODY -#endif -*/ - -namespace dasync { - - -/** - * Values for rearm/disarm return from event handlers - */ -enum class Rearm -{ - /** Re-arm the event watcher so that it receives further events */ - REARM, - /** Disarm the event watcher so that it receives no further events, until it is re-armed explicitly */ - DISARM, - /** Leave in current armed/disarmed state */ - NOOP, - /** Remove the event watcher (and call "removed" callback) */ - REMOVE, - /** The watcher has been removed - don't touch it! */ - REMOVED -// TODO: add a REQUEUE option, which means, "I didn't complete input/output, run me again soon" -}; - - -// Forward declarations: -template class EventLoop; -template class PosixFdWatcher; -template class PosixBidiFdWatcher; -template class PosixSignalWatcher; -template class PosixChildWatcher; - -// Information about a received signal. -// This is essentially a wrapper for the POSIX siginfo_t; its existence allows for mechanisms that receive -// equivalent signal information in a different format (eg signalfd on Linux). -using SigInfo = LoopTraits::SigInfo; - -namespace dprivate { - // (non-public API) - - enum class WatchType - { - SIGNAL, - FD, - CHILD, - SECONDARYFD - }; - - template class EventDispatch; - - // For FD watchers: - // Use this watch flag to indicate that in and out events should be reported separately, - // that is, watcher should not be disabled until all watched event types are queued. - constexpr static int multi_watch = 4; - - // Represents a queued event notification - class BaseWatcher - { - template friend class EventDispatch; - template friend class dasync::EventLoop; - - protected: - WatchType watchType; - int active : 1; - int deleteme : 1; - - BaseWatcher * prev; - BaseWatcher * next; - - public: - - // Perform initialisation necessary before registration with an event loop - void init() - { - active = false; - deleteme = false; - prev = nullptr; - next = nullptr; - } - - BaseWatcher(WatchType wt) noexcept : watchType(wt) { } - - virtual ~BaseWatcher() noexcept { } - - // Called when the watcher has been removed. - // It is guaranteed by the caller that: - // - the dispatch method is not currently running - // - the dispatch method will not be called. - virtual void watchRemoved() noexcept - { - // TODO this "delete" behaviour could be dependent on a flag, perhaps? - // delete this; - } - }; - - // Base signal event - not part of public API - template - class BaseSignalWatcher : public BaseWatcher - { - template friend class EventDispatch; - friend class dasync::EventLoop; - - protected: - SigInfo siginfo; - BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { } - - public: - typedef SigInfo &SigInfo_p; - - virtual Rearm gotSignal(EventLoop * eloop, int signo, SigInfo_p siginfo) = 0; - }; - - template - class BaseFdWatcher : public BaseWatcher - { - template friend class EventDispatch; - friend class dasync::EventLoop; - - protected: - int watch_fd; - int watch_flags; - int event_flags; - - BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { } - - public: - virtual Rearm gotEvent(EventLoop * eloop, int fd, int flags) = 0; - }; - - template - class BaseBidiFdWatcher : public BaseFdWatcher - { - template friend class EventDispatch; - friend class dasync::EventLoop; - - // This should never actually get called: - Rearm gotEvent(EventLoop * eloop, int fd, int flags) final - { - return Rearm::REARM; // should not be reachable. - }; - - protected: - - // The main instance is the "input" watcher only; we keep a secondary watcher - // with a secondary set of flags for the "output" watcher: - BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD); - - int read_removed : 1; // read watch removed? - int write_removed : 1; // write watch removed? - - public: - virtual Rearm readReady(EventLoop * eloop, int fd) noexcept = 0; - virtual Rearm writeReady(EventLoop * eloop, int fd) noexcept = 0; - }; - - template - class BaseChildWatcher : public BaseWatcher - { - template friend class EventDispatch; - friend class dasync::EventLoop; - - protected: - pid_t watch_pid; - int child_status; - - BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { } - - public: - virtual void gotTermStat(EventLoop * eloop, pid_t child, int status) = 0; - }; - - // Classes for implementing a fair(ish) wait queue. - // A queue node can be signalled when it reaches the head of - // the queue. - - template class waitqueue; - template class waitqueue_node; - - // Select an appropriate conditiona variable type for a mutex: - // condition_variable if mutex is std::mutex, or condition_variable_any - // otherwise. - template class condvarSelector; - - template <> class condvarSelector - { - public: - typedef std::condition_variable condvar; - }; - - template class condvarSelector - { - public: - typedef std::condition_variable_any condvar; - }; - - template <> class waitqueue_node - { - // Specialised waitqueue_node for NullMutex. - // TODO can this be reduced to 0 data members? - friend class waitqueue; - waitqueue_node * next = nullptr; - - public: - void wait(std::unique_lock &ul) { } - void signal() { } - }; - - template class waitqueue_node - { - typename condvarSelector::condvar condvar; - friend class waitqueue; - waitqueue_node * next = nullptr; - - public: - void signal() - { - condvar.notify_one(); - } - - void wait(std::unique_lock &mutex_lock) - { - condvar.wait(mutex_lock); - } - }; - - template class waitqueue - { - waitqueue_node * tail = nullptr; - waitqueue_node * head = nullptr; - - public: - waitqueue_node * unqueue() - { - head = head->next; - return head; - } - - waitqueue_node * getHead() - { - return head; - } - - void queue(waitqueue_node *node) - { - if (tail) { - tail->next = node; - } - else { - head = node; - } - } - }; - - // This class serves as the base class (mixin) for the AEN mechanism class. - // Note that EventDispatch, here, and EventLoop (below) are really two sides of one coin; - // they do not work independently. The mixin pattern that we use to avoid dynamic dispatch - // forces them to be two seperate classes, however. - // - // The EventDispatch class maintains the queued event data structures. It inserts watchers - // into the queue when eventes are received (receiveXXX methods). - template class EventDispatch : public Traits - { - friend class EventLoop; - - // queue data structure/pointer - BaseWatcher * first; - - using BaseSignalWatcher = dasync::dprivate::BaseSignalWatcher; - using BaseFdWatcher = dasync::dprivate::BaseFdWatcher; - using BaseBidiFdWatcher = dasync::dprivate::BaseBidiFdWatcher; - using BaseChildWatcher = dasync::dprivate::BaseChildWatcher; - - void queueWatcher(BaseWatcher *bwatcher) - { - // Put in queue: - if (first == nullptr) { - bwatcher->prev = bwatcher; - bwatcher->next = bwatcher; - first = bwatcher; - } - else { - first->prev->next = bwatcher; - bwatcher->prev = first->prev; - first->prev = bwatcher; - bwatcher->next = first; - } - } - - bool isQueued(BaseWatcher *bwatcher) - { - return bwatcher->prev != nullptr; - } - - void dequeueWatcher(BaseWatcher *bwatcher) - { - if (bwatcher->prev == bwatcher) { - // Only item in queue - first = nullptr; - } - else { - if (first == bwatcher) first = first->next; - bwatcher->prev->next = bwatcher->next; - bwatcher->next->prev = bwatcher->prev; - } - - bwatcher->prev = nullptr; - bwatcher->next = nullptr; - } - - protected: - T_Mutex lock; - - void receiveSignal(typename Traits::SigInfo & siginfo, void * userdata) - { - BaseSignalWatcher * bwatcher = static_cast(userdata); - bwatcher->siginfo = siginfo; - queueWatcher(bwatcher); - } - - template - void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags) - { - BaseFdWatcher * bfdw = static_cast(userdata); - - bfdw->event_flags |= flags; - - BaseWatcher * bwatcher = bfdw; - - bool is_multi_watch = bfdw->watch_flags & multi_watch; - if (is_multi_watch) { - BaseBidiFdWatcher *bbdw = static_cast(bwatcher); - if (flags & in_events && flags & out_events) { - // Queue the secondary watcher first: - queueWatcher(&bbdw->outWatcher); - } - else if (flags & out_events) { - // Use the secondary watcher for queueing: - bwatcher = &(bbdw->outWatcher); - } - } - - queueWatcher(bwatcher); - - if (! LoopTraits::has_separate_rw_fd_watches) { - // If this is a bidirectional fd-watch, it has been disabled in *both* directions - // as the event was delivered. However, the other direction should not be disabled - // yet, so we need to re-enable: - int in_out_mask = in_events | out_events; - if (is_multi_watch && bfdw->event_flags != (bfdw->watch_flags & in_out_mask)) { - // We need to re-enable the other channel now: - loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata, - (bfdw->watch_flags & ~(bfdw->event_flags)) | one_shot); - } - } - } - - void receiveChildStat(pid_t child, int status, void * userdata) - { - BaseChildWatcher * watcher = static_cast(userdata); - watcher->child_status = status; - queueWatcher(watcher); - } - - // Pull a single event from the queue - BaseWatcher * pullEvent() - { - BaseWatcher * r = first; - if (r != nullptr) { - dequeueWatcher(r); - } - return r; - } - - void issueDelete(BaseWatcher *watcher) noexcept - { - // This is only called when the attention lock is held, so if the watcher is not - // active/queued now, it cannot become active (and will not be reported with an event) - // during execution of this function. - - lock.lock(); - - // TODO this needs to handle multi-watch (BidiFdWatcher) properly - - if (watcher->active) { - // If the watcher is active, set deleteme true; the watcher will be removed - // at the end of current processing (i.e. when active is set false). - watcher->deleteme = true; - lock.unlock(); - } - else { - // Actually do the delete. - if (isQueued(watcher)) { - dequeueWatcher(watcher); - } - - lock.unlock(); - watcher->watchRemoved(); - } - } - - void issueDelete(BaseBidiFdWatcher *watcher) noexcept - { - lock.lock(); - - if (watcher->active) { - watcher->deleteme = true; - } - else { - if (isQueued(watcher)) { - dequeueWatcher(watcher); - } - - watcher->read_removed = true; - } - - BaseWatcher *secondary = &(watcher->outWatcher); - if (secondary->active) { - secondary->deleteme = true; - } - else { - if (isQueued(secondary)) { - dequeueWatcher(secondary); - } - - watcher->write_removed = true; - } - - if (watcher->read_removed && watcher->write_removed) { - lock.unlock(); - watcher->watchRemoved(); - } - else { - lock.unlock(); - } - } - }; -} - - -template class EventLoop -{ - friend class PosixFdWatcher; - friend class PosixBidiFdWatcher; - friend class PosixSignalWatcher; - friend class PosixChildWatcher; - - template using EventDispatch = dprivate::EventDispatch; - template using waitqueue = dprivate::waitqueue; - template using waitqueue_node = dprivate::waitqueue_node; - using BaseWatcher = dprivate::BaseWatcher; - using BaseSignalWatcher = dprivate::BaseSignalWatcher; - using BaseFdWatcher = dprivate::BaseFdWatcher; - using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher; - using BaseChildWatcher = dprivate::BaseChildWatcher; - using WatchType = dprivate::WatchType; - - Loop>> loop_mech; - - // There is a complex problem with most asynchronous event notification mechanisms - // when used in a multi-threaded environment. Generally, a file descriptor or other - // event type that we are watching will be associated with some data used to manage - // that event source. For example a web server needs to maintain information about - // each client connection, such as the state of the connection (what protocol version - // has been negotiated, etc; if a transfer is taking place, what file is being - // transferred etc). - // - // However, sometimes we want to remove an event source (eg webserver wants to drop - // a connection) and delete the associated data. The problem here is that it is - // difficult to be sure when it is ok to actually remove the data, since when - // requesting to unwatch the source in one thread it is still possible that an - // event from that source is just being reported to another thread (in which case - // the data will be needed). - // - // To solve that, we: - // - allow only one thread to poll for events at a time, using a lock - // - use the same lock to prevent polling, if we want to unwatch an event source - // - generate an event to interrupt any polling that may already be occurring in - // another thread - // - mark handlers as active if they are currently executing, and - // - when removing an active handler, simply set a flag which causes it to be - // removed once the current processing is finished, rather than removing it - // immediately. - // - // In particular the lock mechanism for preventing multiple threads polling and - // for allowing polling to be interrupted is tricky. We can't use a simple mutex - // since there is significant chance that it will be highly contended and there - // are no guarantees that its acquisition will be fair. In particular, we don't - // want a thread that is trying to unwatch a source being starved while another - // thread polls the event source. - // - // So, we use two wait queues protected by a single mutex. The "attn_waitqueue" - // (attention queue) is the high-priority queue, used for threads wanting to - // unwatch event sources. The "wait_waitquueue" is the queue used by threads - // that wish to actually poll for events. - // - The head of the "attn_waitqueue" is always the holder of the lock - // - Therefore, a poll-waiter must be moved from the wait_waitqueue to the - // attn_waitqueue to actually gain the lock. This is only done if the - // attn_waitqueue is otherwise empty. - // - The mutex only protects manipulation of the wait queues, and so should not - // be highly contended. - - T_Mutex wait_lock; // wait lock, used to prevent multiple threads from waiting - // on the event queue simultaneously. - waitqueue attn_waitqueue; - waitqueue wait_waitqueue; - - T_Mutex &getBaseLock() - { - return loop_mech.lock; - } - - void registerSignal(BaseSignalWatcher *callBack, int signo) - { - loop_mech.addSignalWatch(signo, callBack); - } - - void deregister(BaseSignalWatcher *callBack, int signo) noexcept - { - loop_mech.removeSignalWatch(signo); - - waitqueue_node qnode; - getAttnLock(qnode); - - EventDispatch & ed = (EventDispatch &) loop_mech; - ed.issueDelete(callBack); - - releaseLock(qnode); - } - - void registerFd(BaseFdWatcher *callback, int fd, int eventmask) - { - loop_mech.addFdWatch(fd, callback, eventmask | one_shot); - } - - void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask) - { - if (LoopTraits::has_separate_rw_fd_watches) { - // TODO - } - else { - loop_mech.addFdWatch(fd, callback, eventmask | one_shot); - } - } - - void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) - { - if (enabled) { - loop_mech.enableFdWatch(fd, watcher, watch_flags | one_shot); - } - else { - loop_mech.disableFdWatch(fd, watch_flags); - } - } - - void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) - { - if (enabled) { - loop_mech.enableFdWatch_nolock(fd, watcher, watch_flags | one_shot); - } - else { - loop_mech.disableFdWatch_nolock(fd, watch_flags); - } - } - - void deregister(BaseFdWatcher *callback, int fd) - { - loop_mech.removeFdWatch(fd, callback->watch_flags); - - waitqueue_node qnode; - getAttnLock(qnode); - - EventDispatch & ed = (EventDispatch &) loop_mech; - ed.issueDelete(callback); - - releaseLock(qnode); - } - - void deregister(BaseBidiFdWatcher *callback, int fd) - { - if (LoopTraits::has_separate_rw_fd_watches) { - // TODO - } - else { - loop_mech.removeFdWatch(fd, callback->watch_flags); - } - - waitqueue_node qnode; - getAttnLock(qnode); - - EventDispatch & ed = (EventDispatch &) loop_mech; - ed.issueDelete(callback); - - releaseLock(qnode); - } - - void reserveChildWatch(BaseChildWatcher *callBack) - { - loop_mech.reserveChildWatch(); - } - - void registerChild(BaseChildWatcher *callBack, pid_t child) - { - loop_mech.addChildWatch(child, callBack); - } - - void registerReservedChild(BaseChildWatcher *callBack, pid_t child) noexcept - { - loop_mech.addReservedChildWatch(child, callBack); - } - - void dequeueWatcher(BaseWatcher *watcher) noexcept - { - if (loop_mech.isQueued(watcher)) { - loop_mech.dequeueWatcher(watcher); - } - } - - // Acquire the attention lock (when held, ensures that no thread is polling the AEN - // mechanism). - void getAttnLock(waitqueue_node &qnode) - { - std::unique_lock ulock(wait_lock); - attn_waitqueue.queue(&qnode); - if (attn_waitqueue.getHead() != &qnode) { - loop_mech.interruptWait(); - while (attn_waitqueue.getHead() != &qnode) { - qnode.wait(ulock); - } - } - } - - // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower - // priority than the attention lock). - void getPollwaitLock(waitqueue_node &qnode) - { - std::unique_lock ulock(wait_lock); - if (attn_waitqueue.getHead() == nullptr) { - // Queue is completely empty: - attn_waitqueue.queue(&qnode); - } - else { - wait_waitqueue.queue(&qnode); - } - - while (attn_waitqueue.getHead() != &qnode) { - qnode.wait(ulock); - } - } - - // Release the poll-wait/attention lock. - void releaseLock(waitqueue_node &qnode) - { - std::unique_lock ulock(wait_lock); - waitqueue_node * nhead = attn_waitqueue.unqueue(); - if (nhead != nullptr) { - nhead->signal(); - } - else { - nhead = wait_waitqueue.getHead(); - if (nhead != nullptr) { - attn_waitqueue.queue(nhead); - nhead->signal(); - } - } - } - - void processSignalRearm(BaseSignalWatcher * bsw, Rearm rearmType) - { - // Called with lock held - if (rearmType == Rearm::REARM) { - loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo()); - } - else if (rearmType == Rearm::REMOVE) { - loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo()); - } - } - - Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch) - { - // Called with lock held - if (is_multi_watch) { - BaseBidiFdWatcher * bdfw = static_cast(bfw); - - if (rearmType == Rearm::REMOVE) { - bdfw->read_removed = 1; - bdfw->watch_flags &= ~in_events; - - if (! LoopTraits::has_separate_rw_fd_watches) { - if (! bdfw->write_removed) { - return Rearm::NOOP; - } - else { - // both removed: actually remove - loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */); - return Rearm::REMOVE; - } - } - else { - // TODO this will need flags for such a loop, since it can't - // otherwise distinguish which channel watch to remove - loop_mech.removeFdWatch_nolock(bdfw->watch_fd, bdfw->watch_flags); - } - } - else if (rearmType == Rearm::DISARM) { - // Nothing more to do - } - else if (rearmType == Rearm::REARM) { - bdfw->watch_flags |= in_events; - if (! LoopTraits::has_separate_rw_fd_watches) { - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), - (bdfw->watch_flags & (in_events | out_events)) | one_shot); - } - else { - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), - in_events | one_shot); - } - } - return rearmType; - } - else { - if (rearmType == Rearm::REARM) { - loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw, - (bfw->watch_flags & (in_events | out_events)) | one_shot); - } - else if (rearmType == Rearm::REMOVE) { - loop_mech.removeFdWatch_nolock(bfw->watch_fd, bfw->watch_flags); - } - return rearmType; - } - } - - Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType) - { - // Called with lock held - if (rearmType == Rearm::REMOVE) { - bdfw->write_removed = 1; - bdfw->watch_flags &= ~out_events; - - if (LoopTraits::has_separate_rw_fd_watches) { - // TODO this will need flags for such a loop, since it can't - // otherwise distinguish which channel watch to remove - loop_mech.removeFdWatch_nolock(bdfw->watch_fd, bdfw->watch_flags); - return bdfw->read_removed ? Rearm::REMOVE : Rearm::NOOP; - } - else { - if (! bdfw->read_removed) { - return Rearm::NOOP; - } - else { - // both removed: actually remove - loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */); - return Rearm::REMOVE; - } - } - } - else if (rearmType == Rearm::DISARM) { - // Nothing more to do - } - else if (rearmType == Rearm::REARM) { - bdfw->watch_flags |= out_events; - if (! LoopTraits::has_separate_rw_fd_watches) { - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), - (bdfw->watch_flags & (in_events | out_events)) | one_shot); - } - else { - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), - out_events | one_shot); - } - } - return rearmType; - } - - bool processEvents() noexcept - { - EventDispatch & ed = (EventDispatch &) loop_mech; - ed.lock.lock(); - - // So this pulls *all* currently pending events and processes them in the current thread. - // That's probably good for throughput, but maybe the behavior should be configurable. - - BaseWatcher * pqueue = ed.pullEvent(); - bool active = false; - - while (pqueue != nullptr) { - - pqueue->active = true; - active = true; - - Rearm rearmType = Rearm::NOOP; - bool is_multi_watch = false; - BaseBidiFdWatcher *bbfw = nullptr; - - // (Above variables are initialised only to silence compiler warnings). - - // Read/manipulate watch_flags (if necessary) *before* we release the lock: - if (pqueue->watchType == WatchType::FD) { - BaseFdWatcher *bfw = static_cast(pqueue); - bbfw = static_cast(bfw); - is_multi_watch = bfw->watch_flags & dprivate::multi_watch; - if (! LoopTraits::has_separate_rw_fd_watches && is_multi_watch) { - // Clear the input watch flags to avoid enabling read watcher while active: - bfw->watch_flags &= ~in_events; - } - } - else if (pqueue->watchType == WatchType::SECONDARYFD) { - is_multi_watch = true; - char * rp = (char *)pqueue; - rp -= offsetof(BaseBidiFdWatcher, outWatcher); - bbfw = (BaseBidiFdWatcher *)rp; - if (! LoopTraits::has_separate_rw_fd_watches) { - bbfw->watch_flags &= ~out_events; - } - } - - ed.lock.unlock(); - - // Note that we select actions based on the type of the watch, as determined by the watchType - // member. In some ways this screams out for polmorphism; a virtual function could be overridden - // by each of the watcher types. I've instead used switch/case because I think it will perform - // slightly better without the overhead of a virtual function dispatch, but it's got to be a - // close call; I might be guilty of premature optimisation here. - - switch (pqueue->watchType) { - case WatchType::SIGNAL: { - BaseSignalWatcher *bsw = static_cast(pqueue); - rearmType = bsw->gotSignal(this, bsw->siginfo.get_signo(), bsw->siginfo); - break; - } - case WatchType::FD: { - BaseFdWatcher *bfw = static_cast(pqueue); - if (is_multi_watch) { - // The primary watcher for a multi-watch watcher is queued for - // read events. - rearmType = bbfw->readReady(this, bfw->watch_fd); - } - else { - rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags); - } - break; - } - case WatchType::CHILD: { - BaseChildWatcher *bcw = static_cast(pqueue); - bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status); - // Child watches automatically remove: - // TODO what if they want to return REMOVED... - rearmType = Rearm::REMOVE; - break; - } - case WatchType::SECONDARYFD: { - // first construct a pointer to the main watcher: - rearmType = bbfw->writeReady(this, bbfw->watch_fd); - break; - } - default: ; - } - - ed.lock.lock(); - - // (if REMOVED, we must not touch pqueue at all) - if (rearmType != Rearm::REMOVED) { - - pqueue->active = false; - if (pqueue->deleteme) { - // We don't want a watch that is marked "deleteme" to re-arm itself. - rearmType = Rearm::REMOVE; - } - switch (pqueue->watchType) { - case WatchType::SIGNAL: - processSignalRearm(static_cast(pqueue), rearmType); - break; - case WatchType::FD: - rearmType = processFdRearm(static_cast(pqueue), rearmType, is_multi_watch); - break; - case WatchType::SECONDARYFD: - rearmType = processSecondaryRearm(bbfw, rearmType); - break; - default: ; - } - - if (pqueue->deleteme || rearmType == Rearm::REMOVE) { - ed.lock.unlock(); - (is_multi_watch ? bbfw : pqueue)->watchRemoved(); - ed.lock.lock(); - } - } - - pqueue = ed.pullEvent(); - } - - ed.lock.unlock(); - return active; - } - - - public: - void run() noexcept - { - while (! processEvents()) { - waitqueue_node qnode; - - // We only allow one thread to poll the mechanism at any time, since otherwise - // removing event watchers is a nightmare beyond comprehension. - getPollwaitLock(qnode); - - // Pull events from the AEN mechanism and insert them in our internal queue: - loop_mech.pullEvents(true); - - // Now release the wait lock: - releaseLock(qnode); - } - } -}; - - -typedef EventLoop NEventLoop; -typedef EventLoop TEventLoop; - -// from dasync.cc: -TEventLoop & getSystemLoop(); - -// Posix signal event watcher -template -class PosixSignalWatcher : private dprivate::BaseSignalWatcher -{ - using BaseWatcher = dprivate::BaseWatcher; - -public: - using SigInfo_p = typename dprivate::BaseSignalWatcher::SigInfo_p; - - // Register this watcher to watch the specified signal. - // If an attempt is made to register with more than one event loop at - // a time, behaviour is undefined. - inline void registerWatch(EventLoop *eloop, int signo) - { - BaseWatcher::init(); - this->siginfo.set_signo(signo); - eloop->registerSignal(this, signo); - } - - inline void deregisterWatch(EventLoop *eloop) noexcept - { - eloop->deregister(this, this->siginfo.get_signo()); - } - - // virtual Rearm gotSignal(EventLoop *, int signo, SigInfo_p info) = 0; -}; - -// Posix file descriptor event watcher -template -class PosixFdWatcher : private dprivate::BaseFdWatcher -{ - using BaseWatcher = dprivate::BaseWatcher; - - protected: - - // Set the types of event to watch. Only supported if LoopTraits::has_bidi_fd_watch - // is true; otherwise has unspecified behavior. - // Only safe to call from within the callback handler (gotEvent). Might not take - // effect until the current callback handler returns with REARM. - void setWatchFlags(int newFlags) - { - this->watch_flags = newFlags; - } - - public: - - // Register a file descriptor watcher with an event loop. Flags - // can be any combination of dasync::in_events / dasync::out_events. - // Exactly one of in_events/out_events must be specified if the event - // loop does not support bi-directional fd watchers. - // - // Mechanisms supporting dual watchers allow for two watchers for a - // single file descriptor (one watching read status and the other - // write status). Others mechanisms support only a single watcher - // per file descriptor. Adding a watcher beyond what is supported - // causes undefined behavior. - // - // Can fail with std::bad_alloc or std::system_error. - void registerWith(EventLoop *eloop, int fd, int flags) - { - BaseWatcher::init(); - this->watch_fd = fd; - this->watch_flags = flags; - eloop->registerFd(this, fd, flags); - } - - // Deregister a file descriptor watcher. - // - // If other threads may be polling the event loop, it is not safe to assume - // the watcher is unregistered until the watchRemoved() callback is issued - // (which will not occur until the event handler returns, if it is active). - // In a single threaded environment, it is safe to delete the watcher after - // calling this method as long as the handler (if it is active) accesses no - // internal state and returns Rearm::REMOVED. - // TODO: implement REMOVED, or correct above statement. - void deregisterWatch(EventLoop *eloop) noexcept - { - eloop->deregister(this, this->watch_fd); - } - - void setEnabled(EventLoop *eloop, bool enable) noexcept - { - std::lock_guard guard(eloop->getBaseLock()); - eloop->setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable); - if (! enable) { - eloop->dequeueWatcher(this); - } - } - - // virtual Rearm gotEvent(EventLoop *, int fd, int flags) = 0; -}; - -// A Bi-directional file descriptor watcher with independent read- and write- channels. -// This watcher type has two event notification methods which can both potentially be -// active at the same time. -template -class PosixBidiFdWatcher : private dprivate::BaseBidiFdWatcher -{ - using BaseWatcher = dprivate::BaseWatcher; - - void setWatchEnabled(EventLoop *eloop, bool in, bool b) - { - int events = in ? in_events : out_events; - - if (b) { - this->watch_flags |= events; - } - else { - this->watch_flags &= ~events; - } - if (LoopTraits::has_separate_rw_fd_watches) { - dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher; - eloop->setFdEnabled_nolock(watcher, this->watch_fd, events | one_shot, b); - if (! b) { - eloop->dequeueWatcher(watcher); - } - } - else { - eloop->setFdEnabled_nolock(this, this->watch_fd, - (this->watch_flags & (in_events | out_events)) | one_shot, - (this->watch_flags & (in_events | out_events)) != 0); - if (! b) { - dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher; - eloop->dequeueWatcher(watcher); - } - } - } - - protected: - - // TODO if a watch is disabled and currently queued, we should de-queue it. - - void setInWatchEnabled(EventLoop *eloop, bool b) noexcept - { - eloop->getBaseLock().lock(); - setWatchEnabled(eloop, true, b); - eloop->getBaseLock().unlock(); - } - - void setOutWatchEnabled(EventLoop *eloop, bool b) noexcept - { - eloop->getBaseLock().lock(); - setWatchEnabled(eloop, false, b); - eloop->getBaseLock().unlock(); - } - - // Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly. - // - // Concurrency: this method can only be called if - // - it does not enable a watcher that might currently be active - /// - unless the event loop will not be polled while the watcher is active. - // (i.e. it is ok to call setWatchFlags from within the readReady/writeReady handlers if no other - // thread will poll the event loop; it is ok to *dis*able a watcher that might be active). - void setWatchFlags(EventLoop * eloop, int newFlags) - { - std::lock_guard guard(eloop->getBaseLock()); - if (LoopTraits::has_separate_rw_fd_watches) { - setWatchEnabled(eloop, true, (newFlags & in_events) != 0); - setWatchEnabled(eloop, false, (newFlags & out_events) != 0); - } - else { - this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags; - eloop->setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true); - } - } - - public: - - // Register a bi-direction file descriptor watcher with an event loop. Flags - // can be any combination of dasync::in_events / dasync::out_events. - // - // Can fail with std::bad_alloc or std::system_error. - void registerWith(EventLoop *eloop, int fd, int flags) - { - BaseWatcher::init(); - this->outWatcher.BaseWatcher::init(); - this->watch_fd = fd; - this->watch_flags = flags | dprivate::multi_watch; - eloop->registerFd(this, fd, flags); - } - - // Deregister a bi-direction file descriptor watcher. - // - // If other threads may be polling the event loop, it is not safe to assume - // the watcher is unregistered until the watchRemoved() callback is issued - // (which will not occur until the event handler returns, if it is active). - // In a single threaded environment, it is safe to delete the watcher after - // calling this method as long as the handler (if it is active) accesses no - // internal state and returns Rearm::REMOVED. - // TODO: implement REMOVED, or correct above statement. - void deregisterWatch(EventLoop *eloop) noexcept - { - eloop->deregister(this, this->watch_fd); - } - - // Rearm readReady(EventLoop * eloop, int fd) noexcept - // Rearm writeReady(EventLoop * eloop, int fd) noexcept -}; - -// Posix child process event watcher -template -class PosixChildWatcher : private dprivate::BaseChildWatcher -{ - using BaseWatcher = dprivate::BaseWatcher; - - public: - // Reserve resources for a child watcher with the given event loop. - // Reservation can fail with std::bad_alloc. - void reserveWith(EventLoop *eloop) - { - eloop->reserveChildWatch(); - } - - // Register a watcher for the given child process with an event loop. - // Registration can fail with std::bad_alloc. - void registerWith(EventLoop *eloop, pid_t child) - { - BaseWatcher::init(); - this->watch_pid = child; - eloop->registerChild(this, child); - } - - // Register a watcher for the given child process with an event loop, - // after having reserved resources previously (using reserveWith). - // Registration cannot fail. - void registerReserved(EventLoop *eloop, pid_t child) noexcept - { - BaseWatcher::init(); - eloop->registerReservedChild(this, child); - } - - // virtual void gotTermStat(EventLoop *, pid_t child, int status) = 0; -}; - -} // namespace dasync - -#endif diff --git a/src/dasync/dmutex.h b/src/dasync/dmutex.h deleted file mode 100644 index b219621..0000000 --- a/src/dasync/dmutex.h +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef D_MUTEX_H_INCLUDED -#define D_MUTEX_H_INCLUDED - -//#include -#include - -namespace dasync { - -// Simple non-recursive mutex, with priority inheritance to avoid priority inversion. -/* -class DMutex -{ - private: - pthread_mutex_t mutex; - - public: - DMutex() - { - // Avoid priority inversion by using PTHREAD_PRIO_INHERIT - pthread_mutexattr_t attribs; - pthread_mutexattr_init(&attribs); - pthread_mutexattr_setprotocol(&attribs, PTHREAD_PRIO_INHERIT); - pthread_mutex_init(&mutex, &attribs); - } - - void lock() - { - pthread_mutex_lock(&mutex); - } - - void unlock() - { - pthread_mutex_unlock(&mutex); - } -}; -*/ - -using DMutex = std::mutex; - -// A "null" mutex, for which locking / unlocking actually does nothing. -class NullMutex -{ - #ifdef __GNUC__ - #ifndef __clang__ - char empty[0]; // Make class instances take up no space (gcc) - #else - char empty[0] __attribute__((unused)); // Make class instances take up no space (clang) - #endif - #endif - - public: - void lock() { } - void unlock() { } - void try_lock() { } -}; - - -} // end of namespace - - -#endif diff --git a/src/dasynq/dasynq-childproc.h b/src/dasynq/dasynq-childproc.h new file mode 100644 index 0000000..e364613 --- /dev/null +++ b/src/dasynq/dasynq-childproc.h @@ -0,0 +1,141 @@ +namespace dasynq { + +// Map of pid_t to void *, with possibility of reserving entries so that mappings can +// be later added with no danger of allocator exhaustion (bad_alloc). +class pid_map +{ + using pair = std::pair; + std::unordered_map base_map; + std::vector backup_vector; + + // Number of entries in backup_vector that are actually in use (as opposed + // to simply reserved): + int backup_size = 0; + + public: + using entry = std::pair; + + entry get(pid_t key) noexcept + { + auto it = base_map.find(key); + if (it == base_map.end()) { + // Not in map; look in vector + for (int i = 0; i < backup_size; i++) { + if (backup_vector[i].first == key) { + return entry(true, backup_vector[i].second); + } + } + + return entry(false, nullptr); + } + + return entry(true, it->second); + } + + entry erase(pid_t key) noexcept + { + auto iter = base_map.find(key); + if (iter != base_map.end()) { + entry r(true, iter->second); + base_map.erase(iter); + return r; + } + for (int i = 0; i < backup_size; i++) { + if (backup_vector[i].first == key) { + entry r(true, backup_vector[i].second); + backup_vector.erase(backup_vector.begin() + i); + return r; + } + } + return entry(false, nullptr); + } + + // Throws bad_alloc on reservation failure + void reserve() + { + backup_vector.resize(backup_vector.size() + 1); + } + + void add(pid_t key, void *val) // throws std::bad_alloc + { + base_map[key] = val; + } + + void add_from_reserve(pid_t key, void *val) noexcept + { + try { + base_map[key] = val; + backup_vector.resize(backup_vector.size() - 1); + } + catch (std::bad_alloc &) { + // We couldn't add into the map, use the reserve: + backup_vector[backup_size++] = pair(key, val); + } + } +}; + +namespace { + void sigchld_handler(int signum) + { + // If SIGCHLD has no handler (is ignored), SIGCHLD signals will + // not be queued for terminated child processes. (On Linux, the + // default disposition for SIGCHLD is to be ignored but *not* have + // this behavior, which seems inconsistent. Setting a handler doesn't + // hurt in any case). + } +} + +template class ChildProcEvents : public Base +{ + private: + pid_map child_waiters; + + using SigInfo = typename Base::SigInfo; + + protected: + void receiveSignal(SigInfo &siginfo, void *userdata) + { + if (siginfo.get_signo() == SIGCHLD) { + int status; + pid_t child; + while ((child = waitpid(-1, &status, WNOHANG)) > 0) { + pid_map::entry ent = child_waiters.erase(child); + if (ent.first) { + Base::receiveChildStat(child, status, ent.second); + } + } + } + else { + Base::receiveSignal(siginfo, userdata); + } + } + + public: + void reserveChildWatch() + { + child_waiters.reserve(); + } + + void addChildWatch(pid_t child, void *val) + { + child_waiters.add(child, val); + } + + void addReservedChildWatch(pid_t child, void *val) noexcept + { + child_waiters.add_from_reserve(child, val); + } + + template void init(T *loop_mech) + { + loop_mech->addSignalWatch(SIGCHLD, nullptr); + struct sigaction chld_action; + chld_action.sa_handler = sigchld_handler; + sigemptyset(&chld_action.sa_mask); + chld_action.sa_flags = 0; + sigaction(SIGCHLD, &chld_action, nullptr); + } +}; + + +} // end namespace diff --git a/src/dasynq/dasynq-epoll.h b/src/dasynq/dasynq-epoll.h new file mode 100644 index 0000000..1ffbc56 --- /dev/null +++ b/src/dasynq/dasynq-epoll.h @@ -0,0 +1,327 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace dasynq { + +template class EpollLoop; + +class EpollTraits +{ + template friend class EpollLoop; + + public: + + class SigInfo + { + template friend class EpollLoop; + + struct signalfd_siginfo info; + + public: + int get_signo() { return info.ssi_signo; } + int get_sicode() { return info.ssi_code; } + int get_siint() { return info.ssi_int; } + int get_ssiptr() { return info.ssi_ptr; } + int get_ssiaddr() { return info.ssi_addr; } + + void set_signo(int signo) { info.ssi_signo = signo; } + }; + + class FD_r; + + // File descriptor optional storage. If the mechanism can return the file descriptor, this + // class will be empty, otherwise it can hold a file descriptor. + class FD_s { + friend class FD_r; + + // Epoll doesn't return the file descriptor (it can, but it can't return both file + // descriptor and user data). + int fd; + }; + + // File descriptor reference (passed to event callback). If the mechanism can return the + // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor + // must be stored in an FD_s instance. + class FD_r { + public: + int getFd(FD_s ss) + { + return ss.fd; + } + }; + + const static bool has_bidi_fd_watch = true; + const static bool has_separate_rw_fd_watches = false; +}; + + +template class EpollLoop : public Base +{ + int epfd; // epoll fd + int sigfd; // signalfd fd; -1 if not initialised + sigset_t sigmask; + + std::unordered_map sigdataMap; + + // Base contains: + // lock - a lock that can be used to protect internal structure. + // receive*() methods will be called with lock held. + // receiveSignal(SigInfo &, user *) noexcept + // receiveFdEvent(FD_r, user *, int flags) noexcept + + using SigInfo = EpollTraits::SigInfo; + using FD_r = typename EpollTraits::FD_r; + + void processEvents(epoll_event *events, int r) + { + std::lock_guard guard(Base::lock); + + for (int i = 0; i < r; i++) { + void * ptr = events[i].data.ptr; + + if (ptr == &sigfd) { + // Signal + SigInfo siginfo; + while (true) { + int r = read(sigfd, &siginfo.info, sizeof(siginfo.info)); + if (r == -1) break; + if (siginfo.get_signo() != SIGCHLD) { + // TODO remove the special exception for SIGCHLD? + sigdelset(&sigmask, siginfo.get_signo()); + } + auto iter = sigdataMap.find(siginfo.get_signo()); + if (iter != sigdataMap.end()) { + void *userdata = (*iter).second; + Base::receiveSignal(siginfo, userdata); + } + } + signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + } + else { + int flags = 0; + (events[i].events & EPOLLIN) && (flags |= IN_EVENTS); + (events[i].events & EPOLLHUP) && (flags |= IN_EVENTS); + (events[i].events & EPOLLOUT) && (flags |= OUT_EVENTS); + (events[i].events & EPOLLERR) && (flags |= IN_EVENTS | OUT_EVENTS | ERR_EVENTS); + Base::receiveFdEvent(*this, FD_r(), ptr, flags); + } + } + } + + public: + + /** + * EpollLoop constructor. + * + * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. + */ + EpollLoop() : sigfd(-1) + { + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd == -1) { + throw std::system_error(errno, std::system_category()); + } + sigemptyset(&sigmask); + Base::init(this); + } + + ~EpollLoop() + { + close(epfd); + if (sigfd != -1) { + close(sigfd); + } + } + + // flags: IN_EVENTS | OUT_EVENTS + void addFdWatch(int fd, void *userdata, int flags) + { + struct epoll_event epevent; + // epevent.data.fd = fd; + epevent.data.ptr = userdata; + epevent.events = 0; + + if (flags & ONE_SHOT) { + epevent.events = EPOLLONESHOT; + } + if (flags & IN_EVENTS) { + epevent.events |= EPOLLIN; + } + if (flags & OUT_EVENTS) { + epevent.events |= EPOLLOUT; + } + + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } + + void removeFdWatch(int fd, int flags) noexcept + { + epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr); + } + + void removeFdWatch_nolock(int fd, int flags) noexcept + { + removeFdWatch(fd, flags); + } + + // Note this will *replace* the old flags with the new, that is, + // it can enable *or disable* read/write events. + void enableFdWatch(int fd, void *userdata, int flags) noexcept + { + struct epoll_event epevent; + // epevent.data.fd = fd; + epevent.data.ptr = userdata; + epevent.events = 0; + + if (flags & ONE_SHOT) { + epevent.events = EPOLLONESHOT; + } + if (flags & IN_EVENTS) { + epevent.events |= EPOLLIN; + } + if (flags & OUT_EVENTS) { + epevent.events |= EPOLLOUT; + } + + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { + // Shouldn't be able to fail + // throw new std::system_error(errno, std::system_category()); + } + } + + void enableFdWatch_nolock(int fd, void *userdata, int flags) + { + enableFdWatch(fd, userdata, flags); + } + + void disableFdWatch(int fd, int flags) noexcept + { + struct epoll_event epevent; + // epevent.data.fd = fd; + epevent.data.ptr = nullptr; + epevent.events = 0; + + // Epoll documentation says that hangup will still be reported, need to check + // whether this is really the case. Suspect it is really only the case if + // EPOLLIN is set. + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { + // Let's assume that this can't fail. + // throw new std::system_error(errno, std::system_category()); + } + } + + void disableFdWatch_nolock(int fd, int flags) noexcept + { + disableFdWatch(fd, flags); + } + + // Note signal should be masked before call. + void addSignalWatch(int signo, void *userdata) + { + std::lock_guard guard(Base::lock); + + sigdataMap[signo] = userdata; + + // Modify the signal fd to watch the new signal + bool was_no_sigfd = (sigfd == -1); + sigaddset(&sigmask, signo); + sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + if (sigfd == -1) { + throw new std::system_error(errno, std::system_category()); + } + + if (was_no_sigfd) { + // Add the signalfd to the epoll set. + struct epoll_event epevent; + epevent.data.ptr = &sigfd; + epevent.events = EPOLLIN; + // No need for EPOLLONESHOT - we can pull the signals out + // as we see them. + if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) { + close(sigfd); + throw new std::system_error(errno, std::system_category()); + } + } + } + + // Note, called with lock held: + void rearmSignalWatch_nolock(int signo) noexcept + { + sigaddset(&sigmask, signo); + signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + } + + void removeSignalWatch_nolock(int signo) noexcept + { + sigdelset(&sigmask, signo); + signalfd(sigfd, &sigmask, 0); + } + + void removeSignalWatch(int signo) noexcept + { + std::lock_guard guard(Base::lock); + removeSignalWatch_nolock(signo); + } + + // If events are pending, process an unspecified number of them. + // If no events are pending, wait until one event is received and + // process this event (and possibly any other events received + // simultaneously). + // If processing an event removes a watch, there is a possibility + // that the watched event will still be reported (if it has + // occurred) before pullEvents() returns. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pullEvents(bool do_wait) + { + epoll_event events[16]; + int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0); + if (r == -1 || r == 0) { + // signal or no events + return; + } + + processEvents(events, r); + } + + // If events are pending, process one of them. + // If no events are pending, wait until one event is received and + // process this event. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pullOneEvent(bool do_wait) + { + epoll_event events[1]; + int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0); + if (r == -1 || r == 0) { + // signal or no events + return; + } + + processEvents(events, r); + } + + // Interrupt any current poll operation (pullEvents/pullOneEvent), causing + // it to to return immediately. + void interruptWait() + { + // TODO + } +}; + +} // end namespace diff --git a/src/dasynq/dasynq-flags.h b/src/dasynq/dasynq-flags.h new file mode 100644 index 0000000..4b0813b --- /dev/null +++ b/src/dasynq/dasynq-flags.h @@ -0,0 +1,13 @@ +namespace dasynq { + +// Event type bits +constexpr unsigned int IN_EVENTS = 1; +constexpr unsigned int OUT_EVENTS = 2; +constexpr unsigned int ERR_EVENTS = 4; + +constexpr unsigned int ONE_SHOT = 8; + +// Masks: +constexpr unsigned int IO_EVENTS = IN_EVENTS | OUT_EVENTS; + +} diff --git a/src/dasynq/dasynq-kqueue.h b/src/dasynq/dasynq-kqueue.h new file mode 100644 index 0000000..64a3828 --- /dev/null +++ b/src/dasynq/dasynq-kqueue.h @@ -0,0 +1,372 @@ +#include +#include +#include +#include +#include + +#ifdef __OpenBSD__ +#include // for __thrsigdivert aka sigtimedwait +#include +extern "C" { + int __thrsigdivert(sigset_t set, siginfo_t *info, const struct timespec * timeout); +} +#endif + +#include +#include +#include +#include + +#include +#include + +namespace dasynq { + +template class KqueueLoop; + +class KqueueTraits +{ + template friend class KqueueLoop; + + public: + + class SigInfo + { + template friend class KqueueLoop; + + siginfo_t info; + + public: + int get_signo() { return info.si_signo; } + int get_sicode() { return info.si_code; } + char * get_ssiaddr() { return info.si_addr; } + + void set_signo(int signo) { info.si_signo = signo; } + }; + + class FD_r; + + // File descriptor optional storage. If the mechanism can return the file descriptor, this + // class will be empty, otherwise it can hold a file descriptor. + class FD_s { + // Epoll doesn't return the file descriptor (it can, but it can't return both file + // descriptor and user data). + // TODO make true empty. + }; + + // File descriptor reference (passed to event callback). If the mechanism can return the + // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor + // must be stored in an FD_s instance. + class FD_r { + int fd; + public: + int getFd(FD_s ss) + { + return fd; + } + FD_r(int nfd) : fd(nfd) + { + } + }; + + const static bool has_bidi_fd_watch = false; + const static bool has_separate_rw_fd_watches = true; +}; + +#if defined(__OpenBSD__) +// OpenBSD has no sigtimedwait (or sigwaitinfo) but does have "__thrsigdivert", which is +// essentially an incomplete version of the same thing. Discussion with OpenBSD developer +// Ted Unangst suggested that the siginfo_t structure returned might not always have all +// fields set correctly. Furthermore there is a bug such that specifying a zero timeout (or +// indeed any timeout less than a tick) results in NO timeout. +static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct timespec *timeout) +{ + // We know that we're only called with a timeout of 0 (which doesn't work properly) and + // that we safely overwrite the timeout. So, we set tv_nsec to an invalid value, which + // will cause EINVAL to be returned, but will still pick up any pending signals *first*. + timeout->tv_nsec = 1000000001; + return __thrsigdivert(*ssp, info, timeout); +} +#endif + +template class KqueueLoop : public Base +{ + int kqfd; // kqueue fd + sigset_t sigmask; // enabled signal watch mask + + // Map of signal number to user data pointer. If kqueue had been better thought-through, + // we shouldn't need this. Although we can associate user data with an EVFILT_SIGNAL kqueue + // filter, the problem is that the kqueue signal report *coexists* with the regular signal + // delivery mechanism without having any connection to it. Whereas regular signals can be + // queued (especially "realtime" signals, via sigqueue()), kqueue just maintains a counter + // of delivery attempts and clears this when we read the event. What this means is that + // kqueue won't necessarily tell us if signals are pending, in the case that: + // 1) it already reported the attempted signal delivery and + // 2) more than one of the same signal was pending at that time and + // 3) no more deliveries of the same signal have been attempted in the meantime. + // Of course, if kqueue doesn't report the signal, then it doesn't give us the data associated + // with the event, so we need to maintain that separately too: + std::unordered_map sigdataMap; + + // Base contains: + // lock - a lock that can be used to protect internal structure. + // receive*() methods will be called with lock held. + // receiveSignal(SigInfo &, user *) noexcept + // receiveFdEvent(FD_r, user *, int flags) noexcept + + using SigInfo = KqueueTraits::SigInfo; + using FD_r = typename KqueueTraits::FD_r; + + void processEvents(struct kevent *events, int r) + { + std::lock_guard guard(Base::lock); + + for (int i = 0; i < r; i++) { + if (events[i].filter == EVFILT_SIGNAL) { + SigInfo siginfo; + sigset_t sset; + sigemptyset(&sset); + sigaddset(&sset, events[i].ident); + struct timespec timeout; + timeout.tv_sec = 0; + timeout.tv_nsec = 0; + if (sigtimedwait(&sset, &siginfo.info, &timeout) > 0) { + Base::receiveSignal(siginfo, (void *)events[i].udata); + } + + if (events[i].ident != SIGCHLD) { + sigdelset(&sigmask, events[i].ident); + events[i].flags = EV_DISABLE; + } + else { + // TODO can we remove this SIGCHLD hack? + events[i].flags = EV_ENABLE; + } + } + else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) { + int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS; + Base::receiveFdEvent(*this, FD_r(events[i].ident), events[i].udata, flags); + events[i].flags = EV_DISABLE | EV_CLEAR; + // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for + // another connection). + } + else { + events[i].flags = EV_DISABLE; + } + } + + // Now we disable all received events, to simulate EV_DISPATCH: + kevent(kqfd, events, r, nullptr, 0, nullptr); + } + + public: + + /** + * KqueueLoop constructor. + * + * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. + */ + KqueueLoop() + { + kqfd = kqueue(); + if (kqfd == -1) { + throw std::system_error(errno, std::system_category()); + } + sigemptyset(&sigmask); + Base::init(this); + } + + ~KqueueLoop() + { + close(kqfd); + } + + void setFilterEnabled(short filterType, uintptr_t ident, bool enable) + { + struct kevent kev; + EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, 0, 0, 0); + kevent(kqfd, &kev, 1, nullptr, 0, nullptr); + } + + void removeFilter(short filterType, uintptr_t ident) + { + struct kevent kev; + EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0); + kevent(kqfd, &kev, 1, nullptr, 0, nullptr); + } + + // flags: IN_EVENTS | OUT_EVENTS + void addFdWatch(int fd, void *userdata, int flags) + { + // TODO kqueue doesn't support EVFILE_WRITE on file fd's :/ + + short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE; + + struct kevent kev; + EV_SET(&kev, fd, filter, EV_ADD, 0, 0, userdata); + if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } + + void removeFdWatch(int fd, int flags) + { + removeFilter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd); + } + + void removeFdWatch_nolock(int fd, int flags) + { + removeFdWatch(fd, flags); + } + + void enableFdWatch(int fd, void *userdata, int flags) + { + setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, true); + } + + void enableFdWatch_nolock(int fd, void *userdata, int flags) + { + enableFdWatch(fd, userdata, flags); + } + + void disableFdWatch(int fd, int flags) + { + setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, false); + } + + void disableFdWatch_nolock(int fd, int flags) + { + disableFdWatch(fd, flags); + } + + // Note signal should be masked before call. + void addSignalWatch(int signo, void *userdata) + { + std::lock_guard guard(Base::lock); + + sigdataMap[signo] = userdata; + sigaddset(&sigmask, signo); + + struct kevent evt; + EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD, 0, 0, userdata); + // TODO use EV_DISPATCH if available (not on OpenBSD) + + if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } + + // Note, called with lock held: + void rearmSignalWatch_nolock(int signo) noexcept + { + sigaddset(&sigmask, signo); + + struct kevent evt; + EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ENABLE, 0, 0, 0); + // TODO use EV_DISPATCH if available (not on OpenBSD) + + kevent(kqfd, &evt, 1, nullptr, 0, nullptr); + } + + void removeSignalWatch_nolock(int signo) noexcept + { + sigdelset(&sigmask, signo); + + struct kevent evt; + EV_SET(&evt, signo, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0); + + kevent(kqfd, &evt, 1, nullptr, 0, nullptr); + } + + void removeSignalWatch(int signo) noexcept + { + std::lock_guard guard(Base::lock); + removeSignalWatch_nolock(signo); + } + + // If events are pending, process an unspecified number of them. + // If no events are pending, wait until one event is received and + // process this event (and possibly any other events received + // simultaneously). + // If processing an event removes a watch, there is a possibility + // that the watched event will still be reported (if it has + // occurred) before pullEvents() returns. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pullEvents(bool do_wait) + { + // We actually need to check pending signals, since + // kqueue can count signals as they are delivered but the count is + // cleared when we poll the kqueue, meaning that signals might still + // be pending if they were queued multiple times at the last poll. + + // TODO we should only poll for signals that *have* been reported + // as being raised more than once prior via kevent, rather than all + // signals that have been registered - in many cases that will allow + // us to skip the sigtimedwait call altogether. + + { + std::lock_guard guard(Base::lock); + struct timespec timeout; + timeout.tv_sec = 0; + timeout.tv_nsec = 0; + SigInfo siginfo; + int rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout); + while (rsigno > 0) { + // TODO avoid this hack for SIGCHLD somehow + if (rsigno != SIGCHLD) { + sigdelset(&sigmask, rsigno); + // TODO accumulate and disable multiple filters with a single kevents call + // rather than disabling each individually + setFilterEnabled(EVFILT_SIGNAL, rsigno, false); + } + Base::receiveSignal(siginfo, sigdataMap[rsigno]); + rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout); + } + } + + struct kevent events[16]; + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 0; + int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts); + if (r == -1 || r == 0) { + // signal or no events + return; + } + + processEvents(events, r); + } + + // If events are pending, process one of them. + // If no events are pending, wait until one event is received and + // process this event. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pullOneEvent(bool do_wait) + { + // TODO must check for pending signals as per pullEvents() + struct kevent events[1]; + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 0; + int r = kevent(kqfd, nullptr, 0, events, 1, do_wait ? nullptr : &ts); + if (r == -1 || r == 0) { + // signal or no events + return; + } + + processEvents(events, r); + } + + // Interrupt any current poll operation (pullEvents/pullOneEvent), causing + // it to to return immediately. + void interruptWait() + { + // TODO + } +}; + +} // end namespace diff --git a/src/dasynq/dasynq-mutex.h b/src/dasynq/dasynq-mutex.h new file mode 100644 index 0000000..a46c229 --- /dev/null +++ b/src/dasynq/dasynq-mutex.h @@ -0,0 +1,61 @@ +#ifndef D_MUTEX_H_INCLUDED +#define D_MUTEX_H_INCLUDED + +//#include +#include + +namespace dasynq { + +// Simple non-recursive mutex, with priority inheritance to avoid priority inversion. +/* +class DMutex +{ + private: + pthread_mutex_t mutex; + + public: + DMutex() + { + // Avoid priority inversion by using PTHREAD_PRIO_INHERIT + pthread_mutexattr_t attribs; + pthread_mutexattr_init(&attribs); + pthread_mutexattr_setprotocol(&attribs, PTHREAD_PRIO_INHERIT); + pthread_mutex_init(&mutex, &attribs); + } + + void lock() + { + pthread_mutex_lock(&mutex); + } + + void unlock() + { + pthread_mutex_unlock(&mutex); + } +}; +*/ + +using DMutex = std::mutex; + +// A "null" mutex, for which locking / unlocking actually does nothing. +class NullMutex +{ + #ifdef __GNUC__ + #ifndef __clang__ + char empty[0]; // Make class instances take up no space (gcc) + #else + char empty[0] __attribute__((unused)); // Make class instances take up no space (clang) + #endif + #endif + + public: + void lock() { } + void unlock() { } + void try_lock() { } +}; + + +} // end of namespace + + +#endif diff --git a/src/dasynq/dasynq.h b/src/dasynq/dasynq.h new file mode 100644 index 0000000..93ded31 --- /dev/null +++ b/src/dasynq/dasynq.h @@ -0,0 +1,1214 @@ +#ifndef DASYNC_H_INCLUDED +#define DASYNC_H_INCLUDED + +#if defined(__OpenBSD__) +#define HAVE_KQUEUE 1 +#endif + +#if defined(__linux__) +#define HAVE_EPOLL 1 +#endif + +#include "dasynq-flags.h" + +#if defined(HAVE_KQUEUE) +#include "dasynq-kqueue.h" +#include "dasynq-childproc.h" +namespace dasynq { + template using Loop = KqueueLoop; + using LoopTraits = KqueueTraits; +} +#elif defined(HAVE_EPOLL) +#include "dasynq-epoll.h" +#include "dasynq-childproc.h" +namespace dasynq { + template using Loop = EpollLoop; + using LoopTraits = EpollTraits; +} +#endif +#include +#include +#include +#include + +#include "dasynq-mutex.h" + + + +// TODO consider using atomic variables instead of explicit locking where appropriate + +// Allow optimisation of empty classes by including this in the body: +// May be included as the last entry for a class which is only +// _potentially_ empty. + +/* +#ifdef __GNUC__ +#ifdef __clang__ +#define EMPTY_BODY private: char empty_fill[0]; +#else +#define EMPTY_BODY private: char empty_fill[0]; +#endif +#else +#define EMPTY_BODY +#endif +*/ + +namespace dasynq { + + +/** + * Values for rearm/disarm return from event handlers + */ +enum class Rearm +{ + /** Re-arm the event watcher so that it receives further events */ + REARM, + /** Disarm the event watcher so that it receives no further events, until it is re-armed explicitly */ + DISARM, + /** Leave in current armed/disarmed state */ + NOOP, + /** Remove the event watcher (and call "removed" callback) */ + REMOVE, + /** The watcher has been removed - don't touch it! */ + REMOVED +// TODO: add a REQUEUE option, which means, "I didn't complete input/output, run me again soon" +}; + + +// Forward declarations: +template class EventLoop; +template class FdWatcher; +template class BidiFdWatcher; +template class SignalWatcher; +template class ChildProcWatcher; + +// Information about a received signal. +// This is essentially a wrapper for the POSIX siginfo_t; its existence allows for mechanisms that receive +// equivalent signal information in a different format (eg signalfd on Linux). +using SigInfo = LoopTraits::SigInfo; + +namespace dprivate { + // (non-public API) + + enum class WatchType + { + SIGNAL, + FD, + CHILD, + SECONDARYFD + }; + + template class EventDispatch; + + // For FD watchers: + // Use this watch flag to indicate that in and out events should be reported separately, + // that is, watcher should not be disabled until all watched event types are queued. + constexpr static int multi_watch = 4; + + // Represents a queued event notification + class BaseWatcher + { + template friend class EventDispatch; + template friend class dasynq::EventLoop; + + protected: + WatchType watchType; + int active : 1; + int deleteme : 1; + + BaseWatcher * prev; + BaseWatcher * next; + + public: + + // Perform initialisation necessary before registration with an event loop + void init() + { + active = false; + deleteme = false; + prev = nullptr; + next = nullptr; + } + + BaseWatcher(WatchType wt) noexcept : watchType(wt) { } + + virtual ~BaseWatcher() noexcept { } + + // Called when the watcher has been removed. + // It is guaranteed by the caller that: + // - the dispatch method is not currently running + // - the dispatch method will not be called. + virtual void watchRemoved() noexcept + { + // TODO this "delete" behaviour could be dependent on a flag, perhaps? + // delete this; + } + }; + + // Base signal event - not part of public API + template + class BaseSignalWatcher : public BaseWatcher + { + template friend class EventDispatch; + friend class dasynq::EventLoop; + + protected: + SigInfo siginfo; + BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { } + + public: + typedef SigInfo &SigInfo_p; + + virtual Rearm gotSignal(EventLoop * eloop, int signo, SigInfo_p siginfo) = 0; + }; + + template + class BaseFdWatcher : public BaseWatcher + { + template friend class EventDispatch; + friend class dasynq::EventLoop; + + protected: + int watch_fd; + int watch_flags; + int event_flags; + + BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { } + + public: + virtual Rearm gotEvent(EventLoop * eloop, int fd, int flags) = 0; + }; + + template + class BaseBidiFdWatcher : public BaseFdWatcher + { + template friend class EventDispatch; + friend class dasynq::EventLoop; + + // This should never actually get called: + Rearm gotEvent(EventLoop * eloop, int fd, int flags) final + { + return Rearm::REARM; // should not be reachable. + }; + + protected: + + // The main instance is the "input" watcher only; we keep a secondary watcher + // with a secondary set of flags for the "output" watcher: + BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD); + + int read_removed : 1; // read watch removed? + int write_removed : 1; // write watch removed? + + public: + virtual Rearm readReady(EventLoop * eloop, int fd) noexcept = 0; + virtual Rearm writeReady(EventLoop * eloop, int fd) noexcept = 0; + }; + + template + class BaseChildWatcher : public BaseWatcher + { + template friend class EventDispatch; + friend class dasynq::EventLoop; + + protected: + pid_t watch_pid; + int child_status; + + BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { } + + public: + virtual void gotTermStat(EventLoop * eloop, pid_t child, int status) = 0; + }; + + // Classes for implementing a fair(ish) wait queue. + // A queue node can be signalled when it reaches the head of + // the queue. + + template class waitqueue; + template class waitqueue_node; + + // Select an appropriate conditiona variable type for a mutex: + // condition_variable if mutex is std::mutex, or condition_variable_any + // otherwise. + template class condvarSelector; + + template <> class condvarSelector + { + public: + typedef std::condition_variable condvar; + }; + + template class condvarSelector + { + public: + typedef std::condition_variable_any condvar; + }; + + template <> class waitqueue_node + { + // Specialised waitqueue_node for NullMutex. + // TODO can this be reduced to 0 data members? + friend class waitqueue; + waitqueue_node * next = nullptr; + + public: + void wait(std::unique_lock &ul) { } + void signal() { } + }; + + template class waitqueue_node + { + typename condvarSelector::condvar condvar; + friend class waitqueue; + waitqueue_node * next = nullptr; + + public: + void signal() + { + condvar.notify_one(); + } + + void wait(std::unique_lock &mutex_lock) + { + condvar.wait(mutex_lock); + } + }; + + template class waitqueue + { + waitqueue_node * tail = nullptr; + waitqueue_node * head = nullptr; + + public: + waitqueue_node * unqueue() + { + head = head->next; + return head; + } + + waitqueue_node * getHead() + { + return head; + } + + void queue(waitqueue_node *node) + { + if (tail) { + tail->next = node; + } + else { + head = node; + } + } + }; + + // This class serves as the base class (mixin) for the AEN mechanism class. + // Note that EventDispatch, here, and EventLoop (below) are really two sides of one coin; + // they do not work independently. The mixin pattern that we use to avoid dynamic dispatch + // forces them to be two seperate classes, however. + // + // The EventDispatch class maintains the queued event data structures. It inserts watchers + // into the queue when eventes are received (receiveXXX methods). + template class EventDispatch : public Traits + { + friend class EventLoop; + + // queue data structure/pointer + BaseWatcher * first; + + using BaseSignalWatcher = dasynq::dprivate::BaseSignalWatcher; + using BaseFdWatcher = dasynq::dprivate::BaseFdWatcher; + using BaseBidiFdWatcher = dasynq::dprivate::BaseBidiFdWatcher; + using BaseChildWatcher = dasynq::dprivate::BaseChildWatcher; + + void queueWatcher(BaseWatcher *bwatcher) + { + // Put in queue: + if (first == nullptr) { + bwatcher->prev = bwatcher; + bwatcher->next = bwatcher; + first = bwatcher; + } + else { + first->prev->next = bwatcher; + bwatcher->prev = first->prev; + first->prev = bwatcher; + bwatcher->next = first; + } + } + + bool isQueued(BaseWatcher *bwatcher) + { + return bwatcher->prev != nullptr; + } + + void dequeueWatcher(BaseWatcher *bwatcher) + { + if (bwatcher->prev == bwatcher) { + // Only item in queue + first = nullptr; + } + else { + if (first == bwatcher) first = first->next; + bwatcher->prev->next = bwatcher->next; + bwatcher->next->prev = bwatcher->prev; + } + + bwatcher->prev = nullptr; + bwatcher->next = nullptr; + } + + protected: + T_Mutex lock; + + void receiveSignal(typename Traits::SigInfo & siginfo, void * userdata) + { + BaseSignalWatcher * bwatcher = static_cast(userdata); + bwatcher->siginfo = siginfo; + queueWatcher(bwatcher); + } + + template + void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags) + { + BaseFdWatcher * bfdw = static_cast(userdata); + + bfdw->event_flags |= flags; + + BaseWatcher * bwatcher = bfdw; + + bool is_multi_watch = bfdw->watch_flags & multi_watch; + if (is_multi_watch) { + BaseBidiFdWatcher *bbdw = static_cast(bwatcher); + if (flags & IN_EVENTS && flags & OUT_EVENTS) { + // Queue the secondary watcher first: + queueWatcher(&bbdw->outWatcher); + } + else if (flags & OUT_EVENTS) { + // Use the secondary watcher for queueing: + bwatcher = &(bbdw->outWatcher); + } + } + + queueWatcher(bwatcher); + + if (! LoopTraits::has_separate_rw_fd_watches) { + // If this is a bidirectional fd-watch, it has been disabled in *both* directions + // as the event was delivered. However, the other direction should not be disabled + // yet, so we need to re-enable: + int in_out_mask = IN_EVENTS | OUT_EVENTS; + if (is_multi_watch && bfdw->event_flags != (bfdw->watch_flags & in_out_mask)) { + // We need to re-enable the other channel now: + loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata, + (bfdw->watch_flags & ~(bfdw->event_flags)) | ONE_SHOT); + } + } + } + + void receiveChildStat(pid_t child, int status, void * userdata) + { + BaseChildWatcher * watcher = static_cast(userdata); + watcher->child_status = status; + queueWatcher(watcher); + } + + // Pull a single event from the queue + BaseWatcher * pullEvent() + { + BaseWatcher * r = first; + if (r != nullptr) { + dequeueWatcher(r); + } + return r; + } + + void issueDelete(BaseWatcher *watcher) noexcept + { + // This is only called when the attention lock is held, so if the watcher is not + // active/queued now, it cannot become active (and will not be reported with an event) + // during execution of this function. + + lock.lock(); + + // TODO this needs to handle multi-watch (BidiFdWatcher) properly + + if (watcher->active) { + // If the watcher is active, set deleteme true; the watcher will be removed + // at the end of current processing (i.e. when active is set false). + watcher->deleteme = true; + lock.unlock(); + } + else { + // Actually do the delete. + if (isQueued(watcher)) { + dequeueWatcher(watcher); + } + + lock.unlock(); + watcher->watchRemoved(); + } + } + + void issueDelete(BaseBidiFdWatcher *watcher) noexcept + { + lock.lock(); + + if (watcher->active) { + watcher->deleteme = true; + } + else { + if (isQueued(watcher)) { + dequeueWatcher(watcher); + } + + watcher->read_removed = true; + } + + BaseWatcher *secondary = &(watcher->outWatcher); + if (secondary->active) { + secondary->deleteme = true; + } + else { + if (isQueued(secondary)) { + dequeueWatcher(secondary); + } + + watcher->write_removed = true; + } + + if (watcher->read_removed && watcher->write_removed) { + lock.unlock(); + watcher->watchRemoved(); + } + else { + lock.unlock(); + } + } + }; +} + + +template class EventLoop +{ + friend class FdWatcher; + friend class BidiFdWatcher; + friend class SignalWatcher; + friend class ChildProcWatcher; + + template using EventDispatch = dprivate::EventDispatch; + template using waitqueue = dprivate::waitqueue; + template using waitqueue_node = dprivate::waitqueue_node; + using BaseWatcher = dprivate::BaseWatcher; + using BaseSignalWatcher = dprivate::BaseSignalWatcher; + using BaseFdWatcher = dprivate::BaseFdWatcher; + using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher; + using BaseChildWatcher = dprivate::BaseChildWatcher; + using WatchType = dprivate::WatchType; + + Loop>> loop_mech; + + // There is a complex problem with most asynchronous event notification mechanisms + // when used in a multi-threaded environment. Generally, a file descriptor or other + // event type that we are watching will be associated with some data used to manage + // that event source. For example a web server needs to maintain information about + // each client connection, such as the state of the connection (what protocol version + // has been negotiated, etc; if a transfer is taking place, what file is being + // transferred etc). + // + // However, sometimes we want to remove an event source (eg webserver wants to drop + // a connection) and delete the associated data. The problem here is that it is + // difficult to be sure when it is ok to actually remove the data, since when + // requesting to unwatch the source in one thread it is still possible that an + // event from that source is just being reported to another thread (in which case + // the data will be needed). + // + // To solve that, we: + // - allow only one thread to poll for events at a time, using a lock + // - use the same lock to prevent polling, if we want to unwatch an event source + // - generate an event to interrupt any polling that may already be occurring in + // another thread + // - mark handlers as active if they are currently executing, and + // - when removing an active handler, simply set a flag which causes it to be + // removed once the current processing is finished, rather than removing it + // immediately. + // + // In particular the lock mechanism for preventing multiple threads polling and + // for allowing polling to be interrupted is tricky. We can't use a simple mutex + // since there is significant chance that it will be highly contended and there + // are no guarantees that its acquisition will be fair. In particular, we don't + // want a thread that is trying to unwatch a source being starved while another + // thread polls the event source. + // + // So, we use two wait queues protected by a single mutex. The "attn_waitqueue" + // (attention queue) is the high-priority queue, used for threads wanting to + // unwatch event sources. The "wait_waitquueue" is the queue used by threads + // that wish to actually poll for events. + // - The head of the "attn_waitqueue" is always the holder of the lock + // - Therefore, a poll-waiter must be moved from the wait_waitqueue to the + // attn_waitqueue to actually gain the lock. This is only done if the + // attn_waitqueue is otherwise empty. + // - The mutex only protects manipulation of the wait queues, and so should not + // be highly contended. + + T_Mutex wait_lock; // wait lock, used to prevent multiple threads from waiting + // on the event queue simultaneously. + waitqueue attn_waitqueue; + waitqueue wait_waitqueue; + + T_Mutex &getBaseLock() + { + return loop_mech.lock; + } + + void registerSignal(BaseSignalWatcher *callBack, int signo) + { + loop_mech.addSignalWatch(signo, callBack); + } + + void deregister(BaseSignalWatcher *callBack, int signo) noexcept + { + loop_mech.removeSignalWatch(signo); + + waitqueue_node qnode; + getAttnLock(qnode); + + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.issueDelete(callBack); + + releaseLock(qnode); + } + + void registerFd(BaseFdWatcher *callback, int fd, int eventmask) + { + loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT); + } + + void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask) + { + if (LoopTraits::has_separate_rw_fd_watches) { + // TODO + } + else { + loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT); + } + } + + void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) + { + if (enabled) { + loop_mech.enableFdWatch(fd, watcher, watch_flags | ONE_SHOT); + } + else { + loop_mech.disableFdWatch(fd, watch_flags); + } + } + + void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) + { + if (enabled) { + loop_mech.enableFdWatch_nolock(fd, watcher, watch_flags | ONE_SHOT); + } + else { + loop_mech.disableFdWatch_nolock(fd, watch_flags); + } + } + + void deregister(BaseFdWatcher *callback, int fd) + { + loop_mech.removeFdWatch(fd, callback->watch_flags); + + waitqueue_node qnode; + getAttnLock(qnode); + + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.issueDelete(callback); + + releaseLock(qnode); + } + + void deregister(BaseBidiFdWatcher *callback, int fd) + { + if (LoopTraits::has_separate_rw_fd_watches) { + // TODO + } + else { + loop_mech.removeFdWatch(fd, callback->watch_flags); + } + + waitqueue_node qnode; + getAttnLock(qnode); + + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.issueDelete(callback); + + releaseLock(qnode); + } + + void reserveChildWatch(BaseChildWatcher *callBack) + { + loop_mech.reserveChildWatch(); + } + + void registerChild(BaseChildWatcher *callBack, pid_t child) + { + loop_mech.addChildWatch(child, callBack); + } + + void registerReservedChild(BaseChildWatcher *callBack, pid_t child) noexcept + { + loop_mech.addReservedChildWatch(child, callBack); + } + + void dequeueWatcher(BaseWatcher *watcher) noexcept + { + if (loop_mech.isQueued(watcher)) { + loop_mech.dequeueWatcher(watcher); + } + } + + // Acquire the attention lock (when held, ensures that no thread is polling the AEN + // mechanism). + void getAttnLock(waitqueue_node &qnode) + { + std::unique_lock ulock(wait_lock); + attn_waitqueue.queue(&qnode); + if (attn_waitqueue.getHead() != &qnode) { + loop_mech.interruptWait(); + while (attn_waitqueue.getHead() != &qnode) { + qnode.wait(ulock); + } + } + } + + // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower + // priority than the attention lock). + void getPollwaitLock(waitqueue_node &qnode) + { + std::unique_lock ulock(wait_lock); + if (attn_waitqueue.getHead() == nullptr) { + // Queue is completely empty: + attn_waitqueue.queue(&qnode); + } + else { + wait_waitqueue.queue(&qnode); + } + + while (attn_waitqueue.getHead() != &qnode) { + qnode.wait(ulock); + } + } + + // Release the poll-wait/attention lock. + void releaseLock(waitqueue_node &qnode) + { + std::unique_lock ulock(wait_lock); + waitqueue_node * nhead = attn_waitqueue.unqueue(); + if (nhead != nullptr) { + nhead->signal(); + } + else { + nhead = wait_waitqueue.getHead(); + if (nhead != nullptr) { + attn_waitqueue.queue(nhead); + nhead->signal(); + } + } + } + + void processSignalRearm(BaseSignalWatcher * bsw, Rearm rearmType) + { + // Called with lock held + if (rearmType == Rearm::REARM) { + loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo()); + } + else if (rearmType == Rearm::REMOVE) { + loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo()); + } + } + + Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch) + { + // Called with lock held + if (is_multi_watch) { + BaseBidiFdWatcher * bdfw = static_cast(bfw); + + if (rearmType == Rearm::REMOVE) { + bdfw->read_removed = 1; + bdfw->watch_flags &= ~IN_EVENTS; + + if (! LoopTraits::has_separate_rw_fd_watches) { + if (! bdfw->write_removed) { + return Rearm::NOOP; + } + else { + // both removed: actually remove + loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */); + return Rearm::REMOVE; + } + } + else { + // TODO this will need flags for such a loop, since it can't + // otherwise distinguish which channel watch to remove + loop_mech.removeFdWatch_nolock(bdfw->watch_fd, bdfw->watch_flags); + } + } + else if (rearmType == Rearm::DISARM) { + // Nothing more to do + } + else if (rearmType == Rearm::REARM) { + bdfw->watch_flags |= IN_EVENTS; + if (! LoopTraits::has_separate_rw_fd_watches) { + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + (bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + } + else { + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + IN_EVENTS | ONE_SHOT); + } + } + return rearmType; + } + else { + if (rearmType == Rearm::REARM) { + loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw, + (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + } + else if (rearmType == Rearm::REMOVE) { + loop_mech.removeFdWatch_nolock(bfw->watch_fd, bfw->watch_flags); + } + return rearmType; + } + } + + Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType) + { + // Called with lock held + if (rearmType == Rearm::REMOVE) { + bdfw->write_removed = 1; + bdfw->watch_flags &= ~OUT_EVENTS; + + if (LoopTraits::has_separate_rw_fd_watches) { + // TODO this will need flags for such a loop, since it can't + // otherwise distinguish which channel watch to remove + loop_mech.removeFdWatch_nolock(bdfw->watch_fd, bdfw->watch_flags); + return bdfw->read_removed ? Rearm::REMOVE : Rearm::NOOP; + } + else { + if (! bdfw->read_removed) { + return Rearm::NOOP; + } + else { + // both removed: actually remove + loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */); + return Rearm::REMOVE; + } + } + } + else if (rearmType == Rearm::DISARM) { + // Nothing more to do + } + else if (rearmType == Rearm::REARM) { + bdfw->watch_flags |= OUT_EVENTS; + if (! LoopTraits::has_separate_rw_fd_watches) { + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + (bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + } + else { + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + OUT_EVENTS | ONE_SHOT); + } + } + return rearmType; + } + + bool processEvents() noexcept + { + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.lock.lock(); + + // So this pulls *all* currently pending events and processes them in the current thread. + // That's probably good for throughput, but maybe the behavior should be configurable. + + BaseWatcher * pqueue = ed.pullEvent(); + bool active = false; + + while (pqueue != nullptr) { + + pqueue->active = true; + active = true; + + Rearm rearmType = Rearm::NOOP; + bool is_multi_watch = false; + BaseBidiFdWatcher *bbfw = nullptr; + + // (Above variables are initialised only to silence compiler warnings). + + // Read/manipulate watch_flags (if necessary) *before* we release the lock: + if (pqueue->watchType == WatchType::FD) { + BaseFdWatcher *bfw = static_cast(pqueue); + bbfw = static_cast(bfw); + is_multi_watch = bfw->watch_flags & dprivate::multi_watch; + if (! LoopTraits::has_separate_rw_fd_watches && is_multi_watch) { + // Clear the input watch flags to avoid enabling read watcher while active: + bfw->watch_flags &= ~IN_EVENTS; + } + } + else if (pqueue->watchType == WatchType::SECONDARYFD) { + is_multi_watch = true; + char * rp = (char *)pqueue; + rp -= offsetof(BaseBidiFdWatcher, outWatcher); + bbfw = (BaseBidiFdWatcher *)rp; + if (! LoopTraits::has_separate_rw_fd_watches) { + bbfw->watch_flags &= ~OUT_EVENTS; + } + } + + ed.lock.unlock(); + + // Note that we select actions based on the type of the watch, as determined by the watchType + // member. In some ways this screams out for polmorphism; a virtual function could be overridden + // by each of the watcher types. I've instead used switch/case because I think it will perform + // slightly better without the overhead of a virtual function dispatch, but it's got to be a + // close call; I might be guilty of premature optimisation here. + + switch (pqueue->watchType) { + case WatchType::SIGNAL: { + BaseSignalWatcher *bsw = static_cast(pqueue); + rearmType = bsw->gotSignal(this, bsw->siginfo.get_signo(), bsw->siginfo); + break; + } + case WatchType::FD: { + BaseFdWatcher *bfw = static_cast(pqueue); + if (is_multi_watch) { + // The primary watcher for a multi-watch watcher is queued for + // read events. + rearmType = bbfw->readReady(this, bfw->watch_fd); + } + else { + rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags); + } + break; + } + case WatchType::CHILD: { + BaseChildWatcher *bcw = static_cast(pqueue); + bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status); + // Child watches automatically remove: + // TODO what if they want to return REMOVED... + rearmType = Rearm::REMOVE; + break; + } + case WatchType::SECONDARYFD: { + // first construct a pointer to the main watcher: + rearmType = bbfw->writeReady(this, bbfw->watch_fd); + break; + } + default: ; + } + + ed.lock.lock(); + + // (if REMOVED, we must not touch pqueue at all) + if (rearmType != Rearm::REMOVED) { + + pqueue->active = false; + if (pqueue->deleteme) { + // We don't want a watch that is marked "deleteme" to re-arm itself. + rearmType = Rearm::REMOVE; + } + switch (pqueue->watchType) { + case WatchType::SIGNAL: + processSignalRearm(static_cast(pqueue), rearmType); + break; + case WatchType::FD: + rearmType = processFdRearm(static_cast(pqueue), rearmType, is_multi_watch); + break; + case WatchType::SECONDARYFD: + rearmType = processSecondaryRearm(bbfw, rearmType); + break; + default: ; + } + + if (pqueue->deleteme || rearmType == Rearm::REMOVE) { + ed.lock.unlock(); + (is_multi_watch ? bbfw : pqueue)->watchRemoved(); + ed.lock.lock(); + } + } + + pqueue = ed.pullEvent(); + } + + ed.lock.unlock(); + return active; + } + + + public: + void run() noexcept + { + while (! processEvents()) { + waitqueue_node qnode; + + // We only allow one thread to poll the mechanism at any time, since otherwise + // removing event watchers is a nightmare beyond comprehension. + getPollwaitLock(qnode); + + // Pull events from the AEN mechanism and insert them in our internal queue: + loop_mech.pullEvents(true); + + // Now release the wait lock: + releaseLock(qnode); + } + } +}; + + +typedef EventLoop NEventLoop; +typedef EventLoop TEventLoop; + +// from dasync.cc: +TEventLoop & getSystemLoop(); + +// Posix signal event watcher +template +class SignalWatcher : private dprivate::BaseSignalWatcher +{ + using BaseWatcher = dprivate::BaseWatcher; + +public: + using SigInfo_p = typename dprivate::BaseSignalWatcher::SigInfo_p; + + // Register this watcher to watch the specified signal. + // If an attempt is made to register with more than one event loop at + // a time, behaviour is undefined. + inline void registerWatch(EventLoop *eloop, int signo) + { + BaseWatcher::init(); + this->siginfo.set_signo(signo); + eloop->registerSignal(this, signo); + } + + inline void deregisterWatch(EventLoop *eloop) noexcept + { + eloop->deregister(this, this->siginfo.get_signo()); + } + + // virtual Rearm gotSignal(EventLoop *, int signo, SigInfo_p info) = 0; +}; + +// Posix file descriptor event watcher +template +class FdWatcher : private dprivate::BaseFdWatcher +{ + using BaseWatcher = dprivate::BaseWatcher; + + protected: + + // Set the types of event to watch. Only supported if LoopTraits::has_bidi_fd_watch + // is true; otherwise has unspecified behavior. + // Only safe to call from within the callback handler (gotEvent). Might not take + // effect until the current callback handler returns with REARM. + void setWatchFlags(int newFlags) + { + this->watch_flags = newFlags; + } + + public: + + // Register a file descriptor watcher with an event loop. Flags + // can be any combination of dasynq::IN_EVENTS / dasynq::OUT_EVENTS. + // Exactly one of IN_EVENTS/OUT_EVENTS must be specified if the event + // loop does not support bi-directional fd watchers (i.e. if + // ! LoopTraits::has_bidi_fd_watch). + // + // Mechanisms supporting dual watchers allow for two watchers for a + // single file descriptor (one watching read status and the other + // write status). Others mechanisms support only a single watcher + // per file descriptor. Adding a watcher beyond what is supported + // causes undefined behavior. + // + // Can fail with std::bad_alloc or std::system_error. + void registerWith(EventLoop *eloop, int fd, int flags) + { + BaseWatcher::init(); + this->watch_fd = fd; + this->watch_flags = flags; + eloop->registerFd(this, fd, flags); + } + + // Deregister a file descriptor watcher. + // + // If other threads may be polling the event loop, it is not safe to assume + // the watcher is unregistered until the watchRemoved() callback is issued + // (which will not occur until the event handler returns, if it is active). + // In a single threaded environment, it is safe to delete the watcher after + // calling this method as long as the handler (if it is active) accesses no + // internal state and returns Rearm::REMOVED. + // TODO: implement REMOVED, or correct above statement. + void deregisterWatch(EventLoop *eloop) noexcept + { + eloop->deregister(this, this->watch_fd); + } + + void setEnabled(EventLoop *eloop, bool enable) noexcept + { + std::lock_guard guard(eloop->getBaseLock()); + eloop->setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable); + if (! enable) { + eloop->dequeueWatcher(this); + } + } + + // virtual Rearm gotEvent(EventLoop *, int fd, int flags) = 0; +}; + +// A Bi-directional file descriptor watcher with independent read- and write- channels. +// This watcher type has two event notification methods which can both potentially be +// active at the same time. +template +class BidiFdWatcher : private dprivate::BaseBidiFdWatcher +{ + using BaseWatcher = dprivate::BaseWatcher; + + void setWatchEnabled(EventLoop *eloop, bool in, bool b) + { + int events = in ? IN_EVENTS : OUT_EVENTS; + + if (b) { + this->watch_flags |= events; + } + else { + this->watch_flags &= ~events; + } + if (LoopTraits::has_separate_rw_fd_watches) { + dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher; + eloop->setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b); + if (! b) { + eloop->dequeueWatcher(watcher); + } + } + else { + eloop->setFdEnabled_nolock(this, this->watch_fd, + (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT, + (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) != 0); + if (! b) { + dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher; + eloop->dequeueWatcher(watcher); + } + } + } + + protected: + + // TODO if a watch is disabled and currently queued, we should de-queue it. + + void setInWatchEnabled(EventLoop *eloop, bool b) noexcept + { + eloop->getBaseLock().lock(); + setWatchEnabled(eloop, true, b); + eloop->getBaseLock().unlock(); + } + + void setOutWatchEnabled(EventLoop *eloop, bool b) noexcept + { + eloop->getBaseLock().lock(); + setWatchEnabled(eloop, false, b); + eloop->getBaseLock().unlock(); + } + + // Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly. + // + // Concurrency: this method can only be called if + // - it does not enable a watcher that might currently be active + /// - unless the event loop will not be polled while the watcher is active. + // (i.e. it is ok to call setWatchFlags from within the readReady/writeReady handlers if no other + // thread will poll the event loop; it is ok to *dis*able a watcher that might be active). + void setWatchFlags(EventLoop * eloop, int newFlags) + { + std::lock_guard guard(eloop->getBaseLock()); + if (LoopTraits::has_separate_rw_fd_watches) { + setWatchEnabled(eloop, true, (newFlags & IN_EVENTS) != 0); + setWatchEnabled(eloop, false, (newFlags & OUT_EVENTS) != 0); + } + else { + this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags; + eloop->setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true); + } + } + + public: + + // Register a bi-direction file descriptor watcher with an event loop. Flags + // can be any combination of dasynq::IN_EVENTS / dasynq::OUT_EVENTS. + // + // Can fail with std::bad_alloc or std::system_error. + void registerWith(EventLoop *eloop, int fd, int flags) + { + BaseWatcher::init(); + this->outWatcher.BaseWatcher::init(); + this->watch_fd = fd; + this->watch_flags = flags | dprivate::multi_watch; + eloop->registerFd(this, fd, flags); + } + + // Deregister a bi-direction file descriptor watcher. + // + // If other threads may be polling the event loop, it is not safe to assume + // the watcher is unregistered until the watchRemoved() callback is issued + // (which will not occur until the event handler returns, if it is active). + // In a single threaded environment, it is safe to delete the watcher after + // calling this method as long as the handler (if it is active) accesses no + // internal state and returns Rearm::REMOVED. + // TODO: implement REMOVED, or correct above statement. + void deregisterWatch(EventLoop *eloop) noexcept + { + eloop->deregister(this, this->watch_fd); + } + + // Rearm readReady(EventLoop * eloop, int fd) noexcept + // Rearm writeReady(EventLoop * eloop, int fd) noexcept +}; + +// Child process event watcher +template +class ChildProcWatcher : private dprivate::BaseChildWatcher +{ + using BaseWatcher = dprivate::BaseWatcher; + + public: + // Reserve resources for a child watcher with the given event loop. + // Reservation can fail with std::bad_alloc. + void reserveWith(EventLoop *eloop) + { + eloop->reserveChildWatch(); + } + + // Register a watcher for the given child process with an event loop. + // Registration can fail with std::bad_alloc. + void registerWith(EventLoop *eloop, pid_t child) + { + BaseWatcher::init(); + this->watch_pid = child; + eloop->registerChild(this, child); + } + + // Register a watcher for the given child process with an event loop, + // after having reserved resources previously (using reserveWith). + // Registration cannot fail. + void registerReserved(EventLoop *eloop, pid_t child) noexcept + { + BaseWatcher::init(); + eloop->registerReservedChild(this, child); + } + + // virtual void gotTermStat(EventLoop *, pid_t child, int status) = 0; +}; + +} // namespace dasynq + +#endif diff --git a/src/dinit-log.cc b/src/dinit-log.cc index 826a1ac..07cfed8 100644 --- a/src/dinit-log.cc +++ b/src/dinit-log.cc @@ -6,7 +6,7 @@ #include #include -#include "dasync.h" +#include "dasynq.h" #include "service.h" #include "dinit-log.h" @@ -20,7 +20,7 @@ LogLevel log_level[2] = { LogLevel::WARN, LogLevel::WARN }; static ServiceSet *service_set = nullptr; // Reference to service set namespace { -class BufferedLogStream : public PosixFdWatcher +class BufferedLogStream : public FdWatcher { private: @@ -106,7 +106,7 @@ void BufferedLogStream::flushForRelease() // Try to flush any messages that are currently buffered. (Console is non-blocking // so it will fail gracefully). - if (gotEvent(&eventLoop, fd, out_events) == Rearm::DISARM) { + if (gotEvent(&eventLoop, fd, OUT_EVENTS) == Rearm::DISARM) { // Console has already been released at this point. setEnabled(&eventLoop, false); } @@ -223,7 +223,7 @@ Rearm BufferedLogStream::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept void init_log(ServiceSet *sset) { service_set = sset; - log_stream[DLOG_CONS].registerWith(&eventLoop, STDOUT_FILENO, out_events); // TODO register in disabled state + log_stream[DLOG_CONS].registerWith(&eventLoop, STDOUT_FILENO, OUT_EVENTS); // TODO register in disabled state enable_console_log(true); } @@ -232,7 +232,7 @@ void init_log(ServiceSet *sset) void setup_main_log(int fd) { log_stream[DLOG_MAIN].init(fd); - log_stream[DLOG_MAIN].registerWith(&eventLoop, fd, out_events); + log_stream[DLOG_MAIN].registerWith(&eventLoop, fd, OUT_EVENTS); } bool is_log_flushed() noexcept diff --git a/src/dinit.cc b/src/dinit.cc index daaa0a0..a87b4f7 100644 --- a/src/dinit.cc +++ b/src/dinit.cc @@ -13,7 +13,7 @@ #include #include -#include "dasync.h" +#include "dasynq.h" #include "service.h" #include "control.h" #include "dinit-log.h" @@ -60,7 +60,7 @@ */ -using namespace dasync; +using namespace dasynq; using EventLoop_t = EventLoop; EventLoop_t eventLoop = EventLoop_t(); @@ -73,7 +73,7 @@ static void close_control_socket(EventLoop_t *loop) noexcept; static void control_socket_cb(EventLoop_t *loop, int fd); -class ControlSocketWatcher : public PosixFdWatcher +class ControlSocketWatcher : public FdWatcher { Rearm gotEvent(EventLoop_t * loop, int fd, int flags) { @@ -88,7 +88,7 @@ class ControlSocketWatcher : public PosixFdWatcher void registerWith(EventLoop_t * loop, int fd, int flags) { this->fd = fd; - PosixFdWatcher::registerWith(loop, fd, flags); + FdWatcher::registerWith(loop, fd, flags); } }; @@ -133,7 +133,7 @@ const char * get_user_home() namespace { - class CallbackSignalHandler : public PosixSignalWatcher + class CallbackSignalHandler : public SignalWatcher { public: typedef void (*cb_func_t)(EventLoop_t *); @@ -157,7 +157,7 @@ namespace { } }; - class ControlSocketWatcher : public PosixFdWatcher + class ControlSocketWatcher : public FdWatcher { Rearm gotEvent(EventLoop_t * loop, int fd, int flags) { @@ -505,7 +505,7 @@ static void open_control_socket(EventLoop_t *loop) noexcept } control_socket_open = true; - control_socket_io.registerWith(&eventLoop, sockfd, in_events); + control_socket_io.registerWith(&eventLoop, sockfd, IN_EVENTS); } } diff --git a/src/service.cc b/src/service.cc index 746a3b6..92328a2 100644 --- a/src/service.cc +++ b/src/service.cc @@ -726,7 +726,7 @@ bool ServiceRecord::start_ps_process(const std::vector &cmd, bool // Listen for status // TODO should set this up earlier so we can handle failure case (exception) - child_status_listener.registerWith(&eventLoop, pipefd[0], in_events); + child_status_listener.registerWith(&eventLoop, pipefd[0], IN_EVENTS); // Add a process listener so we can detect when the // service stops diff --git a/src/service.h b/src/service.h index 7c4ca90..f796581 100644 --- a/src/service.h +++ b/src/service.h @@ -7,7 +7,7 @@ #include #include -#include "dasync.h" +#include "dasynq.h" #include "control.h" #include "service-listener.h" @@ -180,7 +180,7 @@ static std::vector separate_args(std::string &s, std::list +class ServiceChildWatcher : public ChildProcWatcher { public: // TODO resolve clunkiness of storing this field @@ -190,7 +190,7 @@ class ServiceChildWatcher : public PosixChildWatcher ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { } }; -class ServiceIoWatcher : public PosixFdWatcher +class ServiceIoWatcher : public FdWatcher { public: // TODO resolve clunkiness of storing these fields @@ -203,7 +203,7 @@ class ServiceIoWatcher : public PosixFdWatcher void registerWith(EventLoop_t *loop, int fd, int flags) { this->fd = fd; - PosixFdWatcher::registerWith(loop, fd, flags); + FdWatcher::registerWith(loop, fd, flags); } };