From: Davin McCall Date: Thu, 1 Jun 2017 07:52:22 +0000 (+0100) Subject: Incorporate Dasynq changes X-Git-Tag: v0.06~112 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=ebf6e5294b500eada06090c32be214fa1e54965c;p=oweals%2Fdinit.git Incorporate Dasynq changes --- diff --git a/src/dasynq/dasynq-childproc.h b/src/dasynq/dasynq-childproc.h index 62e5f63..de663db 100644 --- a/src/dasynq/dasynq-childproc.h +++ b/src/dasynq/dasynq-childproc.h @@ -101,7 +101,7 @@ template class ChildProcEvents : public Base using SigInfo = typename Base::SigInfo; template - bool receiveSignal(T & loop_mech, SigInfo &siginfo, void *userdata) + bool receive_signal(T & loop_mech, SigInfo &siginfo, void *userdata) { if (siginfo.get_signo() == SIGCHLD) { int status; @@ -115,7 +115,7 @@ template class ChildProcEvents : public Base return false; // leave signal watch enabled } else { - return Base::receiveSignal(loop_mech, siginfo, userdata); + return Base::receive_signal(loop_mech, siginfo, userdata); } } diff --git a/src/dasynq/dasynq-config.h b/src/dasynq/dasynq-config.h index 0ef66cd..6c1cbca 100644 --- a/src/dasynq/dasynq-config.h +++ b/src/dasynq/dasynq-config.h @@ -1,7 +1,7 @@ #ifndef DASYNQ_CONFIG_H_INCLUDED #define DASYNQ_CONFIG_H_INCLUDED -#if defined(__OpenBSD__) || defined(__APPLE__) +#if defined(__OpenBSD__) || defined(__APPLE__) || defined(__FreeBSD__) #define DASYNQ_HAVE_KQUEUE 1 #endif diff --git a/src/dasynq/dasynq-epoll.h b/src/dasynq/dasynq-epoll.h index c66e85e..cdc7046 100644 --- a/src/dasynq/dasynq-epoll.h +++ b/src/dasynq/dasynq-epoll.h @@ -78,7 +78,7 @@ template class EpollLoop : public Base // Base contains: // lock - a lock that can be used to protect internal structure. // receive*() methods will be called with lock held. - // receiveSignal(SigInfo &, user *) noexcept + // receive_signal(SigInfo &, user *) noexcept // receiveFdEvent(FD_r, user *, int flags) noexcept using SigInfo = EpollTraits::SigInfo; @@ -100,7 +100,7 @@ template class EpollLoop : public Base auto iter = sigdataMap.find(siginfo.get_signo()); if (iter != sigdataMap.end()) { void *userdata = (*iter).second; - if (Base::receiveSignal(*this, siginfo, userdata)) { + if (Base::receive_signal(*this, siginfo, userdata)) { sigdelset(&sigmask, siginfo.get_signo()); } } @@ -143,8 +143,14 @@ template class EpollLoop : public Base } } - // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT - void addFdWatch(int fd, void *userdata, int flags, bool enabled = true) + // fd: file descriptor to watch + // userdata: data to associate with descriptor + // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT + // soft_fail: true if unsupported file descriptors should fail by returning false instead + // of throwing an exception + // returns: true on success; false if file descriptor type isn't supported and soft_fail == true + // throws: std::system_error or std::bad_alloc on failure + bool addFdWatch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false) { struct epoll_event epevent; // epevent.data.fd = fd; @@ -162,11 +168,15 @@ template class EpollLoop : public Base } if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) { + if (soft_fail && errno == EPERM) { + return false; + } throw new std::system_error(errno, std::system_category()); } + return true; } - void addBidiFdWatch(int fd, void *userdata, int flags) + bool addBidiFdWatch(int fd, void *userdata, int flags, bool emulate) { // No implementation. throw std::system_error(std::make_error_code(std::errc::not_supported)); @@ -309,25 +319,10 @@ template class EpollLoop : public Base return; } - processEvents(events, r); - } - - // If events are pending, process one of them. - // If no events are pending, wait until one event is received and - // process this event. - // - // do_wait - if false, returns immediately if no events are - // pending. - void pullOneEvent(bool do_wait) - { - epoll_event events[1]; - int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0); - if (r == -1 || r == 0) { - // signal or no events - return; - } - - processEvents(events, r); + do { + processEvents(events, r); + r = epoll_wait(epfd, events, 16, 0); + } while (r > 0); } }; diff --git a/src/dasynq/dasynq-itimer.h b/src/dasynq/dasynq-itimer.h index 4f14978..4045097 100644 --- a/src/dasynq/dasynq-itimer.h +++ b/src/dasynq/dasynq-itimer.h @@ -8,14 +8,27 @@ namespace dasynq { -// Timer implementation based on the (basically obselete) POSIX itimer interface. +// Timer implementation based on the (basically obsolete) POSIX itimer interface. template class ITimerEvents : public timer_base { private: - int timerfd_fd = -1; - timer_queue_t timer_queue; + +#if defined(CLOCK_MONOTONIC) + static inline void get_curtime(struct timespec &curtime) + { + clock_gettime(CLOCK_MONOTONIC, &curtime); + } +#else + static inline void get_curtime(struct timespec &curtime) + { + struct timeval curtime_tv; + gettimeofday(&curtime_tv, nullptr); + curtime.tv_sec = curtime_tv.tv_sec; + curtime.tv_nsec = curtime_tv.tv_usec * 1000; + } +#endif // Set the timerfd timeout to match the first timer in the queue (disable the timerfd // if there are no active timers). @@ -33,14 +46,7 @@ template class ITimerEvents : public timer_base newtime = timer_queue.get_root_priority(); struct timespec curtime; -#if defined(__APPLE__) - struct timeval curtime_tv; - gettimeofday(&curtime_tv, nullptr); - curtime.tv_sec = curtime_tv.tv_sec; - curtime.tv_nsec = curtime_tv.tv_usec * 1000; -#else - clock_gettime(CLOCK_MONOTONIC, &curtime); -#endif + get_curtime(curtime); newalarm.it_interval = {0, 0}; newalarm.it_value.tv_sec = newtime.tv_sec - curtime.tv_sec; newalarm.it_value.tv_usec = (newtime.tv_nsec - curtime.tv_nsec) / 1000; @@ -57,19 +63,11 @@ template class ITimerEvents : public timer_base using SigInfo = typename Base::SigInfo; template - bool receiveSignal(T & loop_mech, SigInfo &siginfo, void *userdata) + bool receive_signal(T & loop_mech, SigInfo &siginfo, void *userdata) { if (siginfo.get_signo() == SIGALRM) { struct timespec curtime; -#if defined(__APPLE__) - struct timeval curtime_tv; - gettimeofday(&curtime_tv, nullptr); - curtime.tv_sec = curtime_tv.tv_sec; - curtime.tv_nsec = curtime_tv.tv_usec * 1000; -#else - clock_gettime(CLOCK_MONOTONIC, &curtime); - // should we use the REALTIME clock instead? I have no idea :/ -#endif + get_curtime(curtime); timer_base::process_timer_queue(timer_queue, curtime); @@ -79,7 +77,7 @@ template class ITimerEvents : public timer_base return false; // don't disable signal watch } else { - return Base::receiveSignal(loop_mech, siginfo, userdata); + return Base::receive_signal(loop_mech, siginfo, userdata); } } @@ -146,21 +144,14 @@ template class ITimerEvents : public timer_base { // TODO consider caching current time somehow; need to decide then when to update cached value. struct timespec curtime; -#if defined(__APPLE__) - struct timeval curtime_tv; - gettimeofday(&curtime_tv, nullptr); - curtime.tv_sec = curtime_tv.tv_sec; - curtime.tv_nsec = curtime_tv.tv_usec * 1000; -#else - clock_gettime(CLOCK_MONOTONIC, &curtime); -#endif + get_curtime(curtime); curtime.tv_sec += timeout.tv_sec; curtime.tv_nsec += timeout.tv_nsec; if (curtime.tv_nsec > 1000000000) { curtime.tv_nsec -= 1000000000; curtime.tv_sec++; } - setTimer(timer_id, curtime, interval, enable); + setTimer(timer_id, curtime, interval, enable, clock); } // Enables or disabling report of timeouts (does not stop timer) @@ -199,6 +190,11 @@ template class ITimerEvents : public timer_base } } } + + void get_time(timespec &ts, clock_type clock, bool force_update) noexcept + { + get_curtime(ts); + } }; } diff --git a/src/dasynq/dasynq-kqueue.h b/src/dasynq/dasynq-kqueue.h index 6493ba8..a098a08 100644 --- a/src/dasynq/dasynq-kqueue.h +++ b/src/dasynq/dasynq-kqueue.h @@ -22,6 +22,18 @@ extern "C" { #include "dasynq-config.h" +// "kqueue"-based event loop mechanism. +// +// kqueue is available on BSDs and Mac OS X, though there are subtle differences from OS to OS. +// +// kqueue supports watching file descriptors (input and output as separate watches only), +// signals, child processes, and timers. Unfortunately support for the latter two is imperfect; +// it is not possible to reserve process watches in advance; timers can only be active, count +// down immediately when created, and cannot be reset to another time. For timers especially +// the problems are significant: we can't allocate timers in advance, and we can't even feasibly +// manage our own timer queue via a single kqueue-backed timer. Therefore, an alternate timer +// mechanism must be used together with kqueue. + namespace dasynq { template class KqueueLoop; @@ -74,13 +86,13 @@ class KqueueTraits const static bool supports_childwatch_reservation = true; }; -#if defined(__OpenBSD__) +#if defined(__OpenBSD__) && _POSIX_REALTIME_SIGNALS <= 0 // OpenBSD has no sigtimedwait (or sigwaitinfo) but does have "__thrsigdivert", which is // essentially an incomplete version of the same thing. Discussion with OpenBSD developer -// Ted Unangst suggested that the siginfo_t structure returned might not always have all -// fields set correctly. Furthermore there is a bug such that specifying a zero timeout (or -// indeed any timeout less than a tick) results in NO timeout. We get around this by instead -// specifying an *invalid* timeout, which won't error out if a signal is pending. +// Ted Unangst suggested that the siginfo_t structure returned might not always have all fields +// set correctly. Furthermore there is a bug (at least in 5.9) such that specifying a zero +// timeout (or indeed any timeout less than a tick) results in NO timeout. We get around this by +// instead specifying an *invalid* timeout, which won't error out if a signal is pending. static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct timespec *timeout) { // We know that we're only called with a timeout of 0 (which doesn't work properly) and @@ -89,7 +101,9 @@ static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct time timeout->tv_nsec = 1000000001; return __thrsigdivert(*ssp, info, timeout); } +#endif +#if defined(__OpenBSD__) || _POSIX_REALTIME_SIGNALS > 0 static inline void prepare_signal(int signo) { } static inline void unprep_signal(int signo) { } @@ -104,8 +118,10 @@ static bool get_siginfo(int signo, siginfo_t *siginfo) sigaddset(&mask, signo); return (sigtimedwait(&mask, siginfo, &timeout) != -1); } +#else -#elif defined(__APPLE__) +// If we have no sigtimedwait implementation, we have to retrieve signal data by establishing a +// signal handler: static siginfo_t * siginfo_p; @@ -178,7 +194,7 @@ template class KqueueLoop : public Base if (events[i].filter == EVFILT_SIGNAL) { SigInfo siginfo; if (get_siginfo(events[i].ident, &siginfo.info) - && Base::receiveSignal(*this, siginfo, (void *)events[i].udata)) { + && Base::receive_signal(*this, siginfo, (void *)events[i].udata)) { sigdelset(&sigmask, events[i].ident); events[i].flags = EV_DISABLE; } @@ -241,33 +257,103 @@ template class KqueueLoop : public Base kevent(kqfd, &kev, 1, nullptr, 0, nullptr); } - // flags: IN_EVENTS | OUT_EVENTS - void addFdWatch(int fd, void *userdata, int flags, bool enabled = true) + // fd: file descriptor to watch + // userdata: data to associate with descriptor + // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT + // (only one of IN_EVENTS/OUT_EVENTS can be specified) + // soft_fail: true if unsupported file descriptors should fail by returning false instead + // of throwing an exception + // returns: true on success; false if file descriptor type isn't supported and soft_fail == true + // throws: std::system_error or std::bad_alloc on failure + bool addFdWatch(int fd, void *userdata, int flags, bool enabled = true, bool emulate = false) { - // TODO kqueue doesn't support EVFILT_WRITE on file fd's :/ - // Presumably they cause the kevent call to fail. We could maintain - // a separate set and use poll() (urgh). - short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE; - + struct kevent kev; EV_SET(&kev, fd, filter, EV_ADD | (enabled ? 0 : EV_DISABLE), 0, 0, userdata); if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) { + // Note that kqueue supports EVFILT_READ on regular file fd's, but not EVFIL_WRITE. + if (filter == EVFILT_WRITE && errno == EINVAL) { + return false; // emulate + } throw new std::system_error(errno, std::system_category()); } + return true; } - - void addBidiFdWatch(int fd, void *userdata, int flags) + + // returns: 0 on success + // IN_EVENTS if in watch requires emulation + // OUT_EVENTS if out watch requires emulation + int addBidiFdWatch(int fd, void *userdata, int flags, bool emulate = false) { +#ifdef EV_RECEIPT struct kevent kev[2]; + struct kevent kev_r[2]; + short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT; + short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT; + EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata); + EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata); + + int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr); + + if (r == -1) { + throw new std::system_error(errno, std::system_category()); + } + + // Some possibilities: + // - both ends failed. We'll throw an error rather than allowing emulation. + // - read watch failed, write succeeded : should not happen. + // - read watch added, write failed: if emulate == true, succeed; + // if emulate == false, remove read and fail. + + if (kev_r[0].data != 0) { + // read failed + throw new std::system_error(kev_r[0].data, std::system_category()); + } + + if (kev_r[1].data != 0) { + if (emulate) { + return OUT_EVENTS; + } + // remove read watch + EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata); + kevent(kqfd, kev, 1, nullptr, 0, nullptr); + // throw exception + throw new std::system_error(kev_r[1].data, std::system_category()); + } + + return 0; +#else + // OpenBSD doesn't have EV_RECEIPT: install the watches one at a time + struct kevent kev[1]; + short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE); short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE); EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata); - EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata); - - if (kevent(kqfd, kev, 2, nullptr, 0, nullptr) == -1) { + + int r = kevent(kqfd, kev, 1, nullptr, 0, nullptr); + + if (r == -1) { throw new std::system_error(errno, std::system_category()); - } + } + + EV_SET(&kev[0], fd, EVFILT_WRITE, wflags, 0, 0, userdata); + + r = kevent(kqfd, kev, 1, nullptr, 0, nullptr); + + if (r == -1) { + if (emulate) { + return OUT_EVENTS; + } + // remove read watch + EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata); + kevent(kqfd, kev, 1, nullptr, 0, nullptr); + // throw exception + throw new std::system_error(errno, std::system_category()); + } + + return 0; +#endif } // flags specifies which watch to remove; ignored if the loop doesn't support @@ -323,7 +409,7 @@ template class KqueueLoop : public Base struct kevent evt; EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD, 0, 0, userdata); - // TODO use EV_DISPATCH if available (not on OpenBSD) + // TODO use EV_DISPATCH if available (not on OpenBSD/OS X) if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) { throw new std::system_error(errno, std::system_category()); @@ -359,31 +445,22 @@ template class KqueueLoop : public Base removeSignalWatch_nolock(signo); } - // If events are pending, process an unspecified number of them. - // If no events are pending, wait until one event is received and - // process this event (and possibly any other events received - // simultaneously). - // If processing an event removes a watch, there is a possibility - // that the watched event will still be reported (if it has - // occurred) before pullEvents() returns. + private: + + // We actually need to check pending signals before polling the kqueue, since kqueue can + // count signals as they are delivered but the count is cleared when we poll the kqueue, + // meaning that signals might still be pending if they were queued multiple times at the + // last poll (since we report only one signal delivery at a time and the watch is + // automatically disabled each time). // - // do_wait - if false, returns immediately if no events are - // pending. - void pullEvents(bool do_wait) + // The check is not necessary on systems that don't queue signals. +void pull_signals() { - // We actually need to check pending signals, since - // kqueue can count signals as they are delivered but the count is - // cleared when we poll the kqueue, meaning that signals might still - // be pending if they were queued multiple times at the last poll. - +#if _POSIX_REALTIME_SIGNALS > 0 // TODO we should only poll for signals that *have* been reported // as being raised more than once prior via kevent, rather than all - // signals that have been registered - in many cases that will allow + // signals that have been registered - in many cases that may allow // us to skip the sigtimedwait call altogether. - - // The check is not necessary on systems that don't queue signals. - -#if _POSIX_REALTIME_SIGNALS > 0 { std::lock_guard guard(Base::lock); @@ -397,46 +474,44 @@ template class KqueueLoop : public Base sigdelset(&sigmask, rsigno); // TODO accumulate and disable multiple filters with a single kevents call // rather than disabling each individually - setFilterEnabled(EVFILT_SIGNAL, rsigno, false); + setFilterEnabled(EVFILT_SIGNAL, rsigno, sigdataMap[rsigno], false); } rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout); } } #endif - - struct kevent events[16]; - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = 0; - int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts); - if (r == -1 || r == 0) { - // signal or no events - return; - } - - processEvents(events, r); } - // If events are pending, process one of them. + public: + + // If events are pending, process an unspecified number of them. // If no events are pending, wait until one event is received and - // process this event. + // process this event (and possibly any other events received + // simultaneously). + // If processing an event removes a watch, there is a possibility + // that the watched event will still be reported (if it has + // occurred) before pullEvents() returns. // // do_wait - if false, returns immediately if no events are - // pending. - void pullOneEvent(bool do_wait) + // pending. + void pullEvents(bool do_wait) { - // TODO must check for pending signals as per pullEvents() - struct kevent events[1]; + pull_signals(); + + struct kevent events[16]; struct timespec ts; ts.tv_sec = 0; ts.tv_nsec = 0; - int r = kevent(kqfd, nullptr, 0, events, 1, do_wait ? nullptr : &ts); + int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts); if (r == -1 || r == 0) { // signal or no events return; } - - processEvents(events, r); + + do { + processEvents(events, r); + r = kevent(kqfd, nullptr, 0, events, 16, &ts); + } while (r > 0); } }; diff --git a/src/dasynq/dasynq-posixtimer.h b/src/dasynq/dasynq-posixtimer.h new file mode 100644 index 0000000..8131489 --- /dev/null +++ b/src/dasynq/dasynq-posixtimer.h @@ -0,0 +1,248 @@ +#include +#include + +#include +#include +#include + +#include "dasynq-config.h" +#include "dasynq-timerbase.h" + +namespace dasynq { + +// Timer implementation based on POSIX create_timer et al. +// May require linking with -lrt + +template class PosixTimerEvents : public timer_base +{ + private: + timer_queue_t real_timer_queue; + timer_queue_t mono_timer_queue; + + timer_t real_timer; + timer_t mono_timer; + + // Set the timeout to match the first timer in the queue (disable the timer if there are no + // active timers). + void set_timer_from_queue(timer_t &timer, timer_queue_t &timer_queue) + { + struct itimerspec newalarm; + + if (timer_queue.empty()) { + newalarm.it_value = {0, 0}; + newalarm.it_interval = {0, 0}; + timer_settime(timer, TIMER_ABSTIME, &newalarm, nullptr); + return; + } + + newalarm.it_interval = {0, 0}; + newalarm.it_value = timer_queue.get_root_priority(); + timer_settime(timer, TIMER_ABSTIME, &newalarm, nullptr); + } + + protected: + + using SigInfo = typename Base::SigInfo; + + template + bool receive_signal(T & loop_mech, SigInfo &siginfo, void *userdata) + { + if (siginfo.get_signo() == SIGALRM) { + struct timespec curtime; + + if (! real_timer_queue.empty()) { + clock_gettime(CLOCK_REALTIME, &curtime); + timer_base::process_timer_queue(real_timer_queue, curtime); + set_timer_from_queue(real_timer, real_timer_queue); + } + + if (! mono_timer_queue.empty()) { + clock_gettime(CLOCK_MONOTONIC, &curtime); + timer_base::process_timer_queue(mono_timer_queue, curtime); + set_timer_from_queue(mono_timer, mono_timer_queue); + } + + // loop_mech.rearmSignalWatch_nolock(SIGALRM); + return false; // don't disable signal watch + } + else { + return Base::receive_signal(loop_mech, siginfo, userdata); + } + } + + timer_queue_t &queue_for_clock(clock_type clock) + { + switch (clock) { + case clock_type::MONOTONIC: + return mono_timer_queue; + case clock_type::SYSTEM: + return real_timer_queue; + default: + DASYNQ_UNREACHABLE; + } + } + + timer_t &timer_for_clock(clock_type clock) + { + switch (clock) { + case clock_type::MONOTONIC: + return mono_timer; + case clock_type::SYSTEM: + return real_timer; + default: + DASYNQ_UNREACHABLE; + } + } + + public: + + template void init(T *loop_mech) + { + sigset_t sigmask; + sigprocmask(SIG_UNBLOCK, nullptr, &sigmask); + sigaddset(&sigmask, SIGALRM); + sigprocmask(SIG_SETMASK, &sigmask, nullptr); + loop_mech->addSignalWatch(SIGALRM, nullptr); + + struct sigevent timer_sigevent; + timer_sigevent.sigev_notify = SIGEV_SIGNAL; + timer_sigevent.sigev_signo = SIGALRM; + timer_sigevent.sigev_value.sival_int = 0; + + // Create the timers; throw std::system_error if we can't. + if (timer_create(CLOCK_REALTIME, &timer_sigevent, &real_timer) == 0) { + if (timer_create(CLOCK_MONOTONIC, &timer_sigevent, &mono_timer) != 0) { + timer_delete(real_timer); + throw std::system_error(errno, std::system_category()); + } + } + else { + throw std::system_error(errno, std::system_category()); + } + + Base::init(loop_mech); + } + + void addTimer(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC) + { + std::lock_guard guard(Base::lock); + queue_for_clock(clock).allocate(h, userdata); + } + + void removeTimer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept + { + std::lock_guard guard(Base::lock); + removeTimer_nolock(timer_id, clock); + } + + void removeTimer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept + { + timer_queue_t &timer_queue = queue_for_clock(clock); + + if (timer_queue.is_queued(timer_id)) { + timer_queue.remove(timer_id); + } + timer_queue.deallocate(timer_id); + } + + // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0. + // enable: specifies whether to enable reporting of timeouts/intervals + void setTimer(timer_handle_t &timer_id, struct timespec &timeout, struct timespec &interval, + bool enable, clock_type clock = clock_type::MONOTONIC) noexcept + { + std::lock_guard guard(Base::lock); + + timer_queue_t &timer_queue = queue_for_clock(clock); + timer_t &timer = timer_for_clock(clock); + + auto &ts = timer_queue.node_data(timer_id); + ts.interval_time = interval; + ts.expiry_count = 0; + ts.enabled = enable; + + if (timer_queue.is_queued(timer_id)) { + // Already queued; alter timeout + if (timer_queue.set_priority(timer_id, timeout)) { + set_timer_from_queue(timer, timer_queue); + } + } + else { + if (timer_queue.insert(timer_id, timeout)) { + set_timer_from_queue(timer, timer_queue); + } + } + } + + // Set timer relative to current time: + void setTimerRel(timer_handle_t &timer_id, struct timespec &timeout, struct timespec &interval, + bool enable, clock_type clock = clock_type::MONOTONIC) noexcept + { + // TODO consider caching current time somehow; need to decide then when to update cached value. + struct timespec curtime; + int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME; + clock_gettime(posix_clock_id, &curtime); + curtime.tv_sec += timeout.tv_sec; + curtime.tv_nsec += timeout.tv_nsec; + if (curtime.tv_nsec > 1000000000) { + curtime.tv_nsec -= 1000000000; + curtime.tv_sec++; + } + setTimer(timer_id, curtime, interval, enable, clock); + } + + // Enables or disabling report of timeouts (does not stop timer) + void enableTimer(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept + { + std::lock_guard guard(Base::lock); + enableTimer_nolock(timer_id, enable, clock); + } + + void enableTimer_nolock(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept + { + timer_queue_t &timer_queue = queue_for_clock(clock); + + auto &node_data = timer_queue.node_data(timer_id); + auto expiry_count = node_data.expiry_count; + if (expiry_count != 0) { + node_data.expiry_count = 0; + Base::receiveTimerExpiry(timer_id, node_data.userdata, expiry_count); + } + else { + timer_queue.node_data(timer_id).enabled = enable; + } + } + + void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept + { + std::lock_guard guard(Base::lock); + stop_timer_nolock(timer_id, clock); + } + + void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept + { + timer_queue_t &timer_queue = queue_for_clock(clock); + timer_t &timer = timer_for_clock(clock); + + if (timer_queue.is_queued(timer_id)) { + bool was_first = (&timer_queue.get_root()) == &timer_id; + timer_queue.remove(timer_id); + if (was_first) { + set_timer_from_queue(timer, timer_queue); + } + } + } + + void get_time(timespec &ts, clock_type clock, bool force_update) noexcept + { + int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME; + clock_gettime(posix_clock_id, &ts); + } + + ~PosixTimerEvents() + { + timer_delete(mono_timer); + timer_delete(real_timer); + } +}; + +} diff --git a/src/dasynq/dasynq-timerfd.h b/src/dasynq/dasynq-timerfd.h index e660efe..8e49272 100644 --- a/src/dasynq/dasynq-timerfd.h +++ b/src/dasynq/dasynq-timerfd.h @@ -241,6 +241,12 @@ template class TimerFdEvents : public timer_base } } + void get_time(timespec &ts, clock_type clock, bool force_update) noexcept + { + int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME; + clock_gettime(posix_clock_id, &ts); + } + ~TimerFdEvents() { close(timerfd_fd); diff --git a/src/dasynq/dasynq.h b/src/dasynq/dasynq.h index 715841a..836139e 100644 --- a/src/dasynq/dasynq.h +++ b/src/dasynq/dasynq.h @@ -40,10 +40,20 @@ // Loop and LoopTraits defined already; used for testing #elif defined(DASYNQ_HAVE_KQUEUE) #include "dasynq-kqueue.h" +#if _POSIX_TIMERS > 0 +#include "dasynq-posixtimer.h" +namespace dasynq { + template using TimerEvents = PosixTimerEvents; +} +#else #include "dasynq-itimer.h" +namespace dasynq { + template using TimerEvents = ITimerEvents; +} +#endif #include "dasynq-childproc.h" namespace dasynq { - template using Loop = KqueueLoop>>>; + template using Loop = KqueueLoop>>>; using LoopTraits = KqueueTraits; } #elif defined(DASYNQ_HAVE_EPOLL) @@ -69,7 +79,7 @@ namespace dasynq { namespace dasynq { -#ifdef __APPLE__ +#if HAVE_PIPE2 == 0 inline int pipe2(int filedes[2], int flags) { if (pipe(filedes) == -1) { @@ -91,10 +101,10 @@ inline int pipe2(int filedes[2], int flags) #endif namespace dprivate { - class BaseWatcher; + class base_watcher; } -using PrioQueue = NaryHeap; +using PrioQueue = NaryHeap; inline namespace { constexpr int DEFAULT_PRIORITY = 50; @@ -138,7 +148,7 @@ namespace dprivate { template class child_proc_watcher_impl; template class timer_impl; - enum class WatchType + enum class watch_type_t { SIGNAL, FD, @@ -155,22 +165,25 @@ namespace dprivate { constexpr static int multi_watch = 4; // Represents a queued event notification. Various event watchers derive from this type. - class BaseWatcher + class base_watcher { template friend class EventDispatch; template class, typename> friend class dasynq::event_loop; - friend inline void basewatcher_set_active(BaseWatcher &watcher, bool active); - friend inline bool basewatcher_get_deleteme(const BaseWatcher &watcher); + friend inline void basewatcher_set_active(base_watcher &watcher, bool active); + friend inline bool basewatcher_get_deleteme(const base_watcher &watcher); + friend inline bool basewatcher_get_emulatefd(const base_watcher &watcher); protected: - WatchType watchType; + watch_type_t watchType; int active : 1; // currently executing handler? int deleteme : 1; // delete when handler finished? + int emulatefd : 1; // emulate file watch (by re-queueing) + int emulate_enabled : 1; // whether an emulated watch is enabled PrioQueue::handle_t heap_handle; int priority; - static void set_priority(BaseWatcher &p, int prio) + static void set_priority(base_watcher &p, int prio) { p.priority = prio; } @@ -182,16 +195,18 @@ namespace dprivate { { active = false; deleteme = false; + emulatefd = false; + emulate_enabled = false; PrioQueue::init_handle(heap_handle); priority = DEFAULT_PRIORITY; } - BaseWatcher(WatchType wt) noexcept : watchType(wt) { } + base_watcher(watch_type_t wt) noexcept : watchType(wt) { } virtual void dispatch(void *loop_ptr) noexcept { }; virtual void dispatch_second(void *loop_ptr) noexcept { } - virtual ~BaseWatcher() noexcept { } + virtual ~base_watcher() noexcept { } // Called when the watcher has been removed. // It is guaranteed by the caller that: @@ -204,26 +219,31 @@ namespace dprivate { } }; - inline void basewatcher_set_active(BaseWatcher &watcher, bool active) + inline void basewatcher_set_active(base_watcher &watcher, bool active) { watcher.active = active; } - inline bool basewatcher_get_deleteme(const BaseWatcher &watcher) + inline bool basewatcher_get_deleteme(const base_watcher &watcher) { return watcher.deleteme; } + inline bool basewatcher_get_emulatefd(const base_watcher &watcher) + { + return watcher.emulatefd; + } + // Base signal event - not part of public API template - class BaseSignalWatcher : public BaseWatcher + class base_signal_watcher : public base_watcher { friend class EventDispatch; template class, typename> friend class dasynq::event_loop; protected: typename Traits::SigInfo siginfo; - BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { } + base_signal_watcher() : base_watcher(watch_type_t::SIGNAL) { } public: using siginfo_t = typename Traits::SigInfo; @@ -231,7 +251,7 @@ namespace dprivate { }; template - class BaseFdWatcher : public BaseWatcher + class base_fd_watcher : public base_watcher { template friend class EventDispatch; template class, typename> friend class dasynq::event_loop; @@ -248,11 +268,11 @@ namespace dprivate { // the events that the watcher is currently watching (i.e. specifies which // halves of the Bidi watcher are enabled). - BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { } + base_fd_watcher() noexcept : base_watcher(watch_type_t::FD) { } }; template - class BaseBidiFdWatcher : public BaseFdWatcher + class base_bidi_fd_watcher : public base_fd_watcher { template friend class EventDispatch; template class, typename> friend class dasynq::event_loop; @@ -261,14 +281,14 @@ namespace dprivate { // The main instance is the "input" watcher only; we keep a secondary watcher // with a secondary set of flags for the "output" watcher: - BaseWatcher outWatcher {WatchType::SECONDARYFD}; + base_watcher outWatcher {watch_type_t::SECONDARYFD}; int read_removed : 1; // read watch removed? int write_removed : 1; // write watch removed? }; template - class BaseChildWatcher : public BaseWatcher + class base_child_watcher : public base_watcher { template friend class EventDispatch; template class, typename> friend class dasynq::event_loop; @@ -277,12 +297,12 @@ namespace dprivate { pid_t watch_pid; int child_status; - BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { } + base_child_watcher() : base_watcher(watch_type_t::CHILD) { } }; template - class BaseTimerWatcher : public BaseWatcher + class base_timer_watcher : public base_watcher { template friend class EventDispatch; template class, typename> friend class dasynq::event_loop; @@ -292,7 +312,7 @@ namespace dprivate { int intervals; clock_type clock; - BaseTimerWatcher() : BaseWatcher(WatchType::TIMER) + base_timer_watcher() : base_watcher(watch_type_t::TIMER) { init_timer_handle(timer_handle); } @@ -424,7 +444,7 @@ namespace dprivate { // Do standard post-dispatch processing for a watcher. This handles the case of removing or // re-queing watchers depending on the rearm type. - template void post_dispatch(Loop &loop, BaseWatcher *watcher, rearm rearmType) + template void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearmType) { if (rearmType == rearm::REMOVE) { loop.getBaseLock().unlock(); @@ -432,7 +452,7 @@ namespace dprivate { loop.getBaseLock().lock(); } else if (rearmType == rearm::REQUEUE) { - loop.requeueWatcher(watcher); + loop.requeue_watcher(watcher); } } @@ -447,30 +467,30 @@ namespace dprivate { // queue data structure/pointer PrioQueue event_queue; - using BaseSignalWatcher = dasynq::dprivate::BaseSignalWatcher; - using BaseFdWatcher = dasynq::dprivate::BaseFdWatcher; - using BaseBidiFdWatcher = dasynq::dprivate::BaseBidiFdWatcher; - using BaseChildWatcher = dasynq::dprivate::BaseChildWatcher; - using BaseTimerWatcher = dasynq::dprivate::BaseTimerWatcher; + using BaseSignalWatcher = dasynq::dprivate::base_signal_watcher; + using BaseFdWatcher = dasynq::dprivate::base_fd_watcher; + using BaseBidiFdWatcher = dasynq::dprivate::base_bidi_fd_watcher; + using BaseChildWatcher = dasynq::dprivate::base_child_watcher; + using BaseTimerWatcher = dasynq::dprivate::base_timer_watcher; // Add a watcher into the queuing system (but don't queue it) // may throw: std::bad_alloc - void prepare_watcher(BaseWatcher *bwatcher) + void prepare_watcher(base_watcher *bwatcher) { event_queue.allocate(bwatcher->heap_handle, bwatcher); } - void queueWatcher(BaseWatcher *bwatcher) noexcept + void queueWatcher(base_watcher *bwatcher) noexcept { event_queue.insert(bwatcher->heap_handle, bwatcher->priority); } - bool isQueued(BaseWatcher *bwatcher) noexcept + bool isQueued(base_watcher *bwatcher) noexcept { return event_queue.is_queued(bwatcher->heap_handle); } - void dequeueWatcher(BaseWatcher *bwatcher) noexcept + void dequeueWatcher(base_watcher *bwatcher) noexcept { if (event_queue.is_queued(bwatcher->heap_handle)) { event_queue.remove(bwatcher->heap_handle); @@ -478,7 +498,7 @@ namespace dprivate { } // Remove watcher from the queueing system - void release_watcher(BaseWatcher *bwatcher) noexcept + void release_watcher(base_watcher *bwatcher) noexcept { event_queue.deallocate(bwatcher->heap_handle); } @@ -490,7 +510,7 @@ namespace dprivate { // Receive a signal; return true to disable signal watch or false to leave enabled template - bool receiveSignal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata) noexcept + bool receive_signal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata) noexcept { BaseSignalWatcher * bwatcher = static_cast(userdata); bwatcher->siginfo = siginfo; @@ -505,7 +525,7 @@ namespace dprivate { bfdw->event_flags |= flags; - BaseWatcher * bwatcher = bfdw; + base_watcher * bwatcher = bfdw; bool is_multi_watch = bfdw->watch_flags & multi_watch; if (is_multi_watch) { @@ -551,19 +571,19 @@ namespace dprivate { } // Pull a single event from the queue; returns nullptr if the queue is empty. - BaseWatcher * pullEvent() noexcept + base_watcher * pullEvent() noexcept { if (event_queue.empty()) { return nullptr; } auto & rhndl = event_queue.get_root(); - BaseWatcher *r = event_queue.node_data(rhndl); + base_watcher *r = event_queue.node_data(rhndl); event_queue.pull_root(); return r; } - void issueDelete(BaseWatcher *watcher) noexcept + void issueDelete(base_watcher *watcher) noexcept { // This is only called when the attention lock is held, so if the watcher is not // active/queued now, it cannot become active (and will not be reported with an event) @@ -602,7 +622,7 @@ namespace dprivate { watcher->read_removed = true; } - BaseWatcher *secondary = &(watcher->outWatcher); + base_watcher *secondary = &(watcher->outWatcher); if (secondary->active) { secondary->deleteme = true; release_watcher(watcher); @@ -639,7 +659,7 @@ class event_loop friend class dprivate::timer; friend void dprivate::post_dispatch(my_event_loop_t &loop, - dprivate::BaseWatcher *watcher, rearm rearmType); + dprivate::base_watcher *watcher, rearm rearmType); template friend class dprivate::fd_watcher_impl; template friend class dprivate::bidi_fd_watcher_impl; @@ -654,13 +674,13 @@ class event_loop template using EventDispatch = dprivate::EventDispatch; template using waitqueue = dprivate::waitqueue; template using waitqueue_node = dprivate::waitqueue_node; - using BaseWatcher = dprivate::BaseWatcher; - using BaseSignalWatcher = dprivate::BaseSignalWatcher; - using BaseFdWatcher = dprivate::BaseFdWatcher; - using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher; - using BaseChildWatcher = dprivate::BaseChildWatcher; - using BaseTimerWatcher = dprivate::BaseTimerWatcher; - using WatchType = dprivate::WatchType; + using BaseWatcher = dprivate::base_watcher; + using BaseSignalWatcher = dprivate::base_signal_watcher; + using BaseFdWatcher = dprivate::base_fd_watcher; + using BaseBidiFdWatcher = dprivate::base_bidi_fd_watcher; + using BaseChildWatcher = dprivate::base_child_watcher; + using BaseTimerWatcher = dprivate::base_timer_watcher; + using watch_type_t = dprivate::watch_type_t; Loop> loop_mech; @@ -734,19 +754,28 @@ class event_loop loop_mech.removeSignalWatch(signo); waitqueue_node qnode; - getAttnLock(qnode); + get_attn_lock(qnode); EventDispatch & ed = (EventDispatch &) loop_mech; ed.issueDelete(callBack); - releaseLock(qnode); + release_lock(qnode); } - void registerFd(BaseFdWatcher *callback, int fd, int eventmask, bool enabled) + void registerFd(BaseFdWatcher *callback, int fd, int eventmask, bool enabled, bool emulate = false) { loop_mech.prepare_watcher(callback); try { - loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled); + if (! loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled, emulate)) { + callback->emulatefd = true; + callback->emulate_enabled = enabled; + if (enabled) { + callback->event_flags = eventmask & IO_EVENTS; + if (eventmask & IO_EVENTS) { + requeue_watcher(callback); + } + } + } } catch (...) { loop_mech.release_watcher(callback); @@ -754,17 +783,38 @@ class event_loop } } - void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask) + void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask, bool emulate = false) { loop_mech.prepare_watcher(callback); try { loop_mech.prepare_watcher(&callback->outWatcher); try { if (LoopTraits::has_separate_rw_fd_watches) { - loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT); + int r = loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT, emulate); + if (r & IN_EVENTS) { + callback->emulatefd = true; + if (eventmask & IN_EVENTS) { + requeue_watcher(callback); + } + } + if (r & OUT_EVENTS) { + callback->outWatcher.emulatefd = true; + if (eventmask & OUT_EVENTS) { + requeue_watcher(&callback->outWatcher); + } + } } else { - loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT); + if (! loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, true, emulate)) { + callback->emulatefd = true; + callback->outWatcher.emulatefd = true; + if (eventmask & IN_EVENTS) { + requeue_watcher(callback); + } + if (eventmask & OUT_EVENTS) { + requeue_watcher(&callback->outWatcher); + } + } } } catch (...) { @@ -800,15 +850,21 @@ class event_loop void deregister(BaseFdWatcher *callback, int fd) noexcept { - loop_mech.removeFdWatch(fd, callback->watch_flags); + if (callback->emulatefd) { + auto & ed = (EventDispatch &) loop_mech; + ed.issueDelete(callback); + return; + } + loop_mech.removeFdWatch(fd, callback->watch_flags); + waitqueue_node qnode; - getAttnLock(qnode); + get_attn_lock(qnode); - EventDispatch & ed = (EventDispatch &) loop_mech; + auto & ed = (EventDispatch &) loop_mech; ed.issueDelete(callback); - releaseLock(qnode); + release_lock(qnode); } void deregister(BaseBidiFdWatcher *callback, int fd) noexcept @@ -821,12 +877,12 @@ class event_loop } waitqueue_node qnode; - getAttnLock(qnode); + get_attn_lock(qnode); EventDispatch & ed = (EventDispatch &) loop_mech; ed.issueDelete(callback); - releaseLock(qnode); + release_lock(qnode); } void reserveChildWatch(BaseChildWatcher *callback) @@ -874,12 +930,12 @@ class event_loop loop_mech.removeChildWatch(child); waitqueue_node qnode; - getAttnLock(qnode); + get_attn_lock(qnode); EventDispatch & ed = (EventDispatch &) loop_mech; ed.issueDelete(callback); - releaseLock(qnode); + release_lock(qnode); } void registerTimer(BaseTimerWatcher *callback, clock_type clock) @@ -925,20 +981,20 @@ class event_loop loop_mech.removeTimer(callback->timer_handle, clock); waitqueue_node qnode; - getAttnLock(qnode); + get_attn_lock(qnode); EventDispatch & ed = (EventDispatch &) loop_mech; ed.issueDelete(callback); - releaseLock(qnode); + release_lock(qnode); } - void dequeueWatcher(BaseWatcher *watcher) noexcept + void dequeue_watcher(BaseWatcher *watcher) noexcept { loop_mech.dequeueWatcher(watcher); } - void requeueWatcher(BaseWatcher *watcher) noexcept + void requeue_watcher(BaseWatcher *watcher) noexcept { loop_mech.queueWatcher(watcher); } @@ -946,7 +1002,7 @@ class event_loop // Acquire the attention lock (when held, ensures that no thread is polling the AEN // mechanism). This can be used to safely remove watches, since it is certain that // notification callbacks won't be run while the attention lock is held. - void getAttnLock(waitqueue_node &qnode) noexcept + void get_attn_lock(waitqueue_node &qnode) noexcept { std::unique_lock ulock(wait_lock); attn_waitqueue.queue(&qnode); @@ -962,7 +1018,7 @@ class event_loop // the attention lock). The poll-wait lock is used to prevent more than a single thread from // polling the event loop mechanism at a time; if this is not done, it is basically // impossible to safely deregister watches. - void getPollwaitLock(waitqueue_node &qnode) noexcept + void get_pollwait_lock(waitqueue_node &qnode) noexcept { std::unique_lock ulock(wait_lock); if (attn_waitqueue.isEmpty()) { @@ -979,7 +1035,7 @@ class event_loop } // Release the poll-wait/attention lock. - void releaseLock(waitqueue_node &qnode) noexcept + void release_lock(waitqueue_node &qnode) noexcept { std::unique_lock ulock(wait_lock); waitqueue_node * nhead = attn_waitqueue.unqueue(); @@ -1011,29 +1067,37 @@ class event_loop // Process rearm return for fd_watcher, including the primary watcher of a bidi_fd_watcher rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch) noexcept { + bool emulatedfd = static_cast(bfw)->emulatefd; + // Called with lock held if (is_multi_watch) { BaseBidiFdWatcher * bdfw = static_cast(bfw); - + if (rearmType == rearm::REMOVE) { bdfw->read_removed = 1; if (LoopTraits::has_separate_rw_fd_watches) { bdfw->watch_flags &= ~IN_EVENTS; - loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS); + if (! emulatedfd) { + loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS); + } return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP; } else { if (! bdfw->write_removed) { if (bdfw->watch_flags & IN_EVENTS) { bdfw->watch_flags &= ~IN_EVENTS; - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags); + if (! emulatedfd) { + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags); + } } return rearm::NOOP; } else { // both removed: actually remove - loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */); + if (! emulatedfd) { + loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */); + } return rearm::REMOVE; } } @@ -1041,37 +1105,65 @@ class event_loop else if (rearmType == rearm::DISARM) { bdfw->watch_flags &= ~IN_EVENTS; - if (! LoopTraits::has_separate_rw_fd_watches) { - int watch_flags = bdfw->watch_flags; - // without separate r/w watches, enableFdWatch actually sets - // which sides are enabled (i.e. can be used to disable): - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), - (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); - } - else { - loop_mech.disableFdWatch_nolock(bdfw->watch_fd, IN_EVENTS); + if (! emulatedfd) { + if (! LoopTraits::has_separate_rw_fd_watches) { + int watch_flags = bdfw->watch_flags; + // without separate r/w watches, enableFdWatch actually sets + // which sides are enabled (i.e. can be used to disable): + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + } + else { + loop_mech.disableFdWatch_nolock(bdfw->watch_fd, IN_EVENTS); + } } } else if (rearmType == rearm::REARM) { bdfw->watch_flags |= IN_EVENTS; - if (! LoopTraits::has_separate_rw_fd_watches) { - int watch_flags = bdfw->watch_flags; - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), - (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + if (! emulatedfd) { + if (! LoopTraits::has_separate_rw_fd_watches) { + int watch_flags = bdfw->watch_flags; + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); + } + else { + loop_mech.enableFdWatch_nolock(bdfw->watch_fd, + static_cast(bdfw), + IN_EVENTS | ONE_SHOT); + } } else { - loop_mech.enableFdWatch_nolock(bdfw->watch_fd, - static_cast(bdfw), - IN_EVENTS | ONE_SHOT); + rearmType = rearm::REQUEUE; + } + } + else if (rearmType == rearm::NOOP) { + if (bdfw->emulatefd) { + if (bdfw->watch_flags & IN_EVENTS) { + rearmType = rearm::REQUEUE; + } } } return rearmType; } else { // Not multi-watch: - if (rearmType == rearm::REARM) { + if (emulatedfd) { + if (rearmType == rearm::REARM) { + bfw->emulate_enabled = true; + rearmType = rearm::REQUEUE; + } + else if (rearmType == rearm::DISARM) { + bfw->emulate_enabled = false; + } + else if (rearmType == rearm::NOOP) { + if (bfw->emulate_enabled) { + rearmType = rearm::REQUEUE; + } + } + } + else if (rearmType == rearm::REARM) { loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw, (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); } @@ -1086,12 +1178,34 @@ class event_loop } // Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher. - rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, rearm rearmType) noexcept + rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, BaseWatcher * outw, rearm rearmType) noexcept { + bool emulatedfd = outw->emulatefd; + // Called with lock held - if (rearmType == rearm::REMOVE) { + if (emulatedfd) { + if (rearmType == rearm::REMOVE) { + bdfw->write_removed = 1; + bdfw->watch_flags &= ~OUT_EVENTS; + rearmType = bdfw->read_removed ? rearm::REMOVE : rearm::NOOP; + } + else if (rearmType == rearm::DISARM) { + bdfw->watch_flags &= ~OUT_EVENTS; + } + else if (rearmType == rearm::REARM) { + bdfw->watch_flags |= OUT_EVENTS; + rearmType = rearm::REQUEUE; + } + else if (rearmType == rearm::NOOP) { + if (bdfw->watch_flags & OUT_EVENTS) { + rearmType = rearm::REQUEUE; + } + } + return rearmType; + } + else if (rearmType == rearm::REMOVE) { bdfw->write_removed = 1; - + if (LoopTraits::has_separate_rw_fd_watches) { bdfw->watch_flags &= ~OUT_EVENTS; loop_mech.removeFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS); @@ -1178,10 +1292,13 @@ class event_loop // (Above variables are initialised only to silence compiler warnings). - if (pqueue->watchType == WatchType::SECONDARYFD) { + if (pqueue->watchType == watch_type_t::SECONDARYFD) { // construct a pointer to the main watcher: char * rp = (char *)pqueue; + _Pragma ("GCC diagnostic push") + _Pragma ("GCC diagnostic ignored \"-Winvalid-offsetof\"") rp -= offsetof(BaseBidiFdWatcher, outWatcher); + _Pragma ("GCC diagnostic pop") bbfw = (BaseBidiFdWatcher *)rp; // issue a secondary dispatch: @@ -1198,7 +1315,6 @@ class event_loop return active; } - public: using mutex_t = T_Mutex; @@ -1214,32 +1330,45 @@ class event_loop template using child_proc_watcher_impl = dprivate::child_proc_watcher_impl; template using timer_impl = dprivate::timer_impl; + // Poll the event loop and process any pending events. If no events are pending, wait + // for and process at least one event. void run() noexcept { // Poll the mechanism first, in case high-priority events are pending: waitqueue_node qnode; - getPollwaitLock(qnode); + get_pollwait_lock(qnode); loop_mech.pullEvents(false); - releaseLock(qnode); + release_lock(qnode); while (! processEvents()) { // Pull events from the AEN mechanism and insert them in our internal queue: - getPollwaitLock(qnode); + get_pollwait_lock(qnode); loop_mech.pullEvents(true); - releaseLock(qnode); + release_lock(qnode); } } + // Poll the event loop and process any pending events void poll() noexcept { - // Poll the mechanism first, in case high-priority events are pending: waitqueue_node qnode; - getPollwaitLock(qnode); + get_pollwait_lock(qnode); loop_mech.pullEvents(false); - releaseLock(qnode); + release_lock(qnode); processEvents(); } + + // Get the current time corresponding to a specific clock. + // ts - the timespec variable to receive the time + // clock - specifies the clock + // force_update (default = false) - if true, the time returned will be updated from + // the system rather than being a previously cached result. It may be more + // accurate, but note that reading from a system clock may be relatively expensive. + void get_time(timespec &ts, clock_type clock, bool force_update = false) noexcept + { + loop_mech.get_time(ts, clock, force_update); + } }; typedef event_loop event_loop_n; @@ -1249,15 +1378,15 @@ namespace dprivate { // Posix signal event watcher template -class signal_watcher : private dprivate::BaseSignalWatcher +class signal_watcher : private dprivate::base_signal_watcher { template friend class signal_watcher_impl; - using BaseWatcher = dprivate::BaseWatcher; + using BaseWatcher = dprivate::base_watcher; using T_Mutex = typename EventLoop::mutex_t; public: - using siginfo_p = typename dprivate::BaseSignalWatcher::siginfo_p; + using siginfo_p = typename dprivate::base_signal_watcher::siginfo_p; // Register this watcher to watch the specified signal. // If an attempt is made to register with more than one event loop at @@ -1338,11 +1467,11 @@ class signal_watcher_impl : public signal_watcher // Posix file descriptor event watcher template -class fd_watcher : private dprivate::BaseFdWatcher +class fd_watcher : private dprivate::base_fd_watcher { template friend class fd_watcher_impl; - using BaseWatcher = dprivate::BaseWatcher; + using BaseWatcher = dprivate::base_watcher; using T_Mutex = typename EventLoop::mutex_t; protected: @@ -1377,7 +1506,16 @@ class fd_watcher : private dprivate::BaseFdWatcher this->priority = prio; this->watch_fd = fd; this->watch_flags = flags; - eloop.registerFd(this, fd, flags, enabled); + eloop.registerFd(this, fd, flags, enabled, true); + } + + void add_watch_noemu(EventLoop &eloop, int fd, int flags, bool enabled = true, int prio = DEFAULT_PRIORITY) + { + BaseWatcher::init(); + this->priority = prio; + this->watch_fd = fd; + this->watch_flags = flags; + eloop.registerFd(this, fd, flags, enabled, false); } int get_watched_fd() @@ -1401,9 +1539,14 @@ class fd_watcher : private dprivate::BaseFdWatcher void set_enabled(EventLoop &eloop, bool enable) noexcept { std::lock_guard guard(eloop.getBaseLock()); - eloop.setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable); + if (this->emulatefd) { + this->emulate_enabled = enable; + } + else { + eloop.setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable); + } if (! enable) { - eloop.dequeueWatcher(this); + eloop.dequeue_watcher(this); } } @@ -1412,13 +1555,13 @@ class fd_watcher : private dprivate::BaseFdWatcher template static fd_watcher *add_watch(EventLoop &eloop, int fd, int flags, T watchHndlr) { - class LambdaFdWatcher : public fd_watcher_impl + class lambda_fd_watcher : public fd_watcher_impl { private: T watchHndlr; public: - LambdaFdWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a) + lambda_fd_watcher(T watchHandlr_a) : watchHndlr(watchHandlr_a) { // } @@ -1434,7 +1577,7 @@ class fd_watcher : private dprivate::BaseFdWatcher } }; - LambdaFdWatcher * lfd = new LambdaFdWatcher(watchHndlr); + lambda_fd_watcher * lfd = new lambda_fd_watcher(watchHndlr); lfd->add_watch(eloop, fd, flags); return lfd; } @@ -1448,6 +1591,10 @@ class fd_watcher_impl : public fd_watcher void dispatch(void *loop_ptr) noexcept override { EventLoop &loop = *static_cast(loop_ptr); + + // In case emulating, clear enabled here; REARM or explicit set_enabled will re-enable. + this->emulate_enabled = false; + loop.getBaseLock().unlock(); auto rearmType = static_cast(this)->fd_event(loop, this->watch_fd, this->event_flags); @@ -1474,11 +1621,11 @@ class fd_watcher_impl : public fd_watcher // This watcher type has two event notification methods which can both potentially be // active at the same time. template -class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher +class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher { template friend class bidi_fd_watcher_impl; - using BaseWatcher = dprivate::BaseWatcher; + using BaseWatcher = dprivate::base_watcher; using T_Mutex = typename EventLoop::mutex_t; void set_watch_enabled(EventLoop &eloop, bool in, bool b) @@ -1491,22 +1638,23 @@ class bidi_fd_watcher : private dprivate::BaseBidiFdWatcherwatch_flags &= ~events; } - if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) { - dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher; - eloop.setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b); - if (! b) { - eloop.dequeueWatcher(watcher); + + dprivate::base_watcher * watcher = in ? this : &this->outWatcher; + + if (! basewatcher_get_emulatefd(*watcher)) { + if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) { + eloop.setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b); } - } - else { - eloop.setFdEnabled_nolock(this, this->watch_fd, - (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT, - (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) != 0); - if (! b) { - dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher; - eloop.dequeueWatcher(watcher); + else { + eloop.setFdEnabled_nolock(this, this->watch_fd, + (this->watch_flags & IO_EVENTS) | ONE_SHOT, + (this->watch_flags & IO_EVENTS) != 0); } } + + if (! b) { + eloop.dequeue_watcher(watcher); + } } public: @@ -1536,13 +1684,14 @@ class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher guard(eloop.getBaseLock()); - if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) { + bool use_emulation = this->emulatefd || basewatcher_get_emulatefd(this->outWatcher); + if (use_emulation || EventLoop::loop_traits_t::has_separate_rw_fd_watches) { set_watch_enabled(eloop, true, (newFlags & IN_EVENTS) != 0); set_watch_enabled(eloop, false, (newFlags & OUT_EVENTS) != 0); } else { this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags; - eloop.setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true); + eloop.setFdEnabled((dprivate::base_watcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true); } } @@ -1560,9 +1709,22 @@ class bidi_fd_watcher : private dprivate::BaseBidiFdWatcherwrite_removed = false; this->priority = inprio; this->set_priority(this->outWatcher, outprio); - eloop.registerFd(this, fd, flags); + eloop.registerFd(this, fd, flags, true); } - + + void add_watch_noemu(EventLoop &eloop, int fd, int flags, int inprio = DEFAULT_PRIORITY, int outprio = DEFAULT_PRIORITY) + { + BaseWatcher::init(); + this->outWatcher.BaseWatcher::init(); + this->watch_fd = fd; + this->watch_flags = flags | dprivate::multi_watch; + this->read_removed = false; + this->write_removed = false; + this->priority = inprio; + this->set_priority(this->outWatcher, outprio); + eloop.registerFd(this, fd, flags, false); + } + int get_watched_fd() { return this->watch_fd; @@ -1626,6 +1788,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher void dispatch(void *loop_ptr) noexcept override { EventLoop &loop = *static_cast(loop_ptr); + this->emulate_enabled = false; loop.getBaseLock().unlock(); auto rearmType = static_cast(this)->read_ready(loop, this->watch_fd); @@ -1665,7 +1828,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher rearmType = rearm::REMOVE; } - rearmType = loop.processSecondaryRearm(this, rearmType); + rearmType = loop.processSecondaryRearm(this, &outwatcher, rearmType); if (rearmType == rearm::REQUEUE) { post_dispatch(loop, &outwatcher, rearmType); @@ -1679,11 +1842,11 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher // Child process event watcher template -class child_proc_watcher : private dprivate::BaseChildWatcher +class child_proc_watcher : private dprivate::base_child_watcher { template friend class child_proc_watcher_impl; - using BaseWatcher = dprivate::BaseWatcher; + using BaseWatcher = dprivate::base_watcher; using T_Mutex = typename EventLoop::mutex_t; public: @@ -1738,8 +1901,11 @@ class child_proc_watcher : private dprivate::BaseChildWatcherpriority = prio; + if (EventLoop::loop_traits_t::supports_childwatch_reservation) { // Reserve a watch, fork, then claim reservation if (! from_reserved) { @@ -1764,6 +1930,7 @@ class child_proc_watcher : private dprivate::BaseChildWatcherwatch_pid = child; eloop.registerReservedChild_nolock(this, child); lock.unlock(); return child; @@ -1801,6 +1968,7 @@ class child_proc_watcher : private dprivate::BaseChildWatcherwatch_pid = child; eloop.registerChild(this, child); // Continue in child (it doesn't matter what is written): @@ -1827,7 +1995,7 @@ class child_proc_watcher_impl : public child_proc_watcher EventLoop &loop = *static_cast(loop_ptr); loop.getBaseLock().unlock(); - auto rearmType = static_cast(this)->child_status(loop, this->watch_pid, this->child_status); + auto rearmType = static_cast(this)->status_change(loop, this->watch_pid, this->child_status); loop.getBaseLock().lock(); @@ -1846,15 +2014,16 @@ class child_proc_watcher_impl : public child_proc_watcher }; template -class timer : private BaseTimerWatcher +class timer : private base_timer_watcher { template friend class timer_impl; - using base_t = BaseTimerWatcher; + using base_t = base_timer_watcher; public: void add_timer(EventLoop &eloop, clock_type clock = clock_type::MONOTONIC, int prio = DEFAULT_PRIORITY) { + base_watcher::init(); this->priority = prio; this->clock = clock; eloop.registerTimer(this, clock);