From 2ea73e614cbc22ad4e9296d1f1fd4982159ea8de Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Sun, 12 Mar 2017 23:16:34 +0000 Subject: [PATCH] Update dasynq library, and make appropriate changes in dinit code. --- src/control.h | 22 +- src/dasynq/dasynq-binaryheap.h | 256 +++++++++++++++ src/dasynq/dasynq-childproc.h | 12 +- src/dasynq/dasynq-epoll.h | 10 +- src/dasynq/dasynq-kqueue.h | 19 +- src/dasynq/dasynq-mutex.h | 6 +- src/dasynq/dasynq-svec.h | 163 ++++++++++ src/dasynq/dasynq-timerfd.h | 236 ++++++++++++++ src/dasynq/dasynq.h | 578 ++++++++++++++++++++++++--------- src/dinit-log.cc | 22 +- src/dinit.cc | 14 +- src/service.cc | 10 +- src/service.h | 4 +- 13 files changed, 1145 insertions(+), 207 deletions(-) create mode 100644 src/dasynq/dasynq-binaryheap.h create mode 100644 src/dasynq/dasynq-svec.h create mode 100644 src/dasynq/dasynq-timerfd.h diff --git a/src/control.h b/src/control.h index 6bfd790..9f29633 100644 --- a/src/control.h +++ b/src/control.h @@ -19,13 +19,13 @@ // Control connection for dinit using namespace dasynq; -using EventLoop_t = EventLoop; +using EventLoop_t = event_loop; class ControlConn; class ControlConnWatcher; // forward-declaration of callback: -static Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); +static rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); // Pointer to the control connection that is listening for rollback completion extern ControlConn * rollback_handler_conn; @@ -49,14 +49,14 @@ class ServiceRecord; class ControlConnWatcher : public EventLoop_t::BidiFdWatcher { - inline Rearm receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept; + inline rearm receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept; - Rearm readReady(EventLoop_t &loop, int fd) noexcept override + rearm readReady(EventLoop_t &loop, int fd) noexcept override { return receiveEvent(loop, fd, IN_EVENTS); } - Rearm writeReady(EventLoop_t &loop, int fd) noexcept override + rearm writeReady(EventLoop_t &loop, int fd) noexcept override { return receiveEvent(loop, fd, OUT_EVENTS); } @@ -76,7 +76,7 @@ class ControlConnWatcher : public EventLoop_t::BidiFdWatcher } }; -inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept +inline rearm ControlConnWatcher::receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept { return control_conn_cb(&loop, this, flags); } @@ -84,7 +84,7 @@ inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t &loop, int fd, int fla class ControlConn : private ServiceListener { - friend Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); + friend rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents); ControlConnWatcher iob; EventLoop_t *loop; @@ -216,24 +216,24 @@ class ControlConn : private ServiceListener }; -static Rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents) +static rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents) { char * cc_addr = (reinterpret_cast(watcher)) - offsetof(ControlConn, iob); ControlConn *conn = reinterpret_cast(cc_addr); if (revents & IN_EVENTS) { if (conn->dataReady()) { delete conn; - return Rearm::REMOVED; + return rearm::REMOVED; } } if (revents & OUT_EVENTS) { if (conn->sendData()) { delete conn; - return Rearm::REMOVED; + return rearm::REMOVED; } } - return Rearm::NOOP; + return rearm::NOOP; } #endif diff --git a/src/dasynq/dasynq-binaryheap.h b/src/dasynq/dasynq-binaryheap.h new file mode 100644 index 0000000..aafd53d --- /dev/null +++ b/src/dasynq/dasynq-binaryheap.h @@ -0,0 +1,256 @@ +#ifndef DASYNC_BINARYHEAP_H_INCLUDED +#define DASYNC_BINARYHEAP_H_INCLUDED + +#include "dasynq-svec.h" +#include +#include +#include + +namespace dasynq { + +/** + * Priority queue implementation based on a binary heap. + * + * Heap entry "handles" maintain an index into the heap. When the position of a node in the heap + * changes, its handle must be updated. + * + * T : node data type + * P : priority type (eg int) + * Compare : functional object type to compare priorities + */ +template > +class BinaryHeap +{ + public: + struct handle_t; + + private: + + // Actual heap node + class HeapNode + { + public: + P data; + handle_t * hnd_p; + + HeapNode(handle_t * hnd, const P &odata) : data(odata), hnd_p(hnd) + { + // nothing to do + } + }; + + svector hvec; + + using hindex_t = typename decltype(hvec)::size_type; + + int root_node = -1; + hindex_t num_nodes = 0; + + public: + + // Handle to an element on the heap in the node buffer; also contains the data associated + // with the node. (Alternative implementation would be to store the heap data in a + // separate container, and have the handle be an index into that container). + struct handle_t + { + T hd; + hindex_t heap_index; + }; + + // Initialise a handle (if it does not have a suitable constructor). Need not do anything + // but may store a sentinel value to mark the handle as inactive. It should not be + // necessary to call this, really. + static void init_handle(handle_t &h) noexcept + { + } + + private: + + // In hindsight, I probably could have used std::priority_queue rather than re-implementing a + // queue here. However, priority_queue doesn't expose the container (except as a protected + // member) and so would make it more complicated to reserve storage. It would also have been + // necessary to override assignment between elements to correctly update the reference in the + // handle. + + bool bubble_down() + { + return bubble_down(hvec.size() - 1); + } + + // Bubble a newly added timer down to the correct position + bool bubble_down(hindex_t pos) + { + // int pos = v.size() - 1; + Compare lt; + while (pos > 0) { + hindex_t parent = (pos - 1) / 2; + if (! lt(hvec[pos].data, hvec[parent].data)) { + break; + } + + std::swap(hvec[pos], hvec[parent]); + std::swap(hvec[pos].hnd_p->heap_index, hvec[parent].hnd_p->heap_index); + pos = parent; + } + + return pos == 0; + } + + void bubble_up(hindex_t pos = 0) + { + Compare lt; + hindex_t rmax = hvec.size(); + hindex_t max = (rmax - 1) / 2; + + while (pos <= max) { + hindex_t selchild; + hindex_t lchild = pos * 2 + 1; + hindex_t rchild = lchild + 1; + if (rchild >= rmax) { + selchild = lchild; + } + else { + // select the sooner of lchild and rchild + selchild = lt(hvec[lchild].data, hvec[rchild].data) ? lchild : rchild; + } + + if (! lt(hvec[selchild].data, hvec[pos].data)) { + break; + } + + std::swap(hvec[selchild].hnd_p->heap_index, hvec[pos].hnd_p->heap_index); + std::swap(hvec[selchild], hvec[pos]); + pos = selchild; + } + } + + void remove_h(hindex_t hidx) + { + // bvec[hvec[hidx].data_index].heap_index = -1; + hvec[hidx].hnd_p->heap_index = -1; + if (hvec.size() != hidx + 1) { + // replace the first element with the last: + // bvec[hvec.back().data_index].heap_index = hidx; + hvec.back().hnd_p->heap_index = hidx; + hvec[hidx] = hvec.back(); + hvec.pop_back(); + + // Now bubble up: + bubble_up(hidx); + } + else { + hvec.pop_back(); + } + } + + public: + + T & node_data(handle_t & index) noexcept + { + return index.hd; + } + + // Allocate a slot, but do not incorporate into the heap: + // u... : parameters for data constructor T::T(...) + template void allocate(handle_t & hnd, U... u) + { + new (& hnd.hd) T(u...); + hnd.heap_index = -1; + constexpr hindex_t max_allowed = std::numeric_limits::is_signed ? + std::numeric_limits::max() : ((hindex_t) - 2); + + if (num_nodes == max_allowed) { + throw std::bad_alloc(); + } + + num_nodes++; + + if (__builtin_expect(hvec.capacity() < num_nodes, 0)) { + hindex_t half_point = max_allowed / 2; + try { + if (__builtin_expect(num_nodes < half_point, 1)) { + hvec.reserve(num_nodes * 2); + } + else { + hvec.reserve(max_allowed); + } + } + catch (std::bad_alloc &e) { + hvec.reserve(num_nodes); + } + } + } + + // Deallocate a slot + void deallocate(handle_t & index) noexcept + { + num_nodes--; + + // shrink the capacity of hvec if num_nodes is sufficiently less than + // its current capacity: + if (num_nodes < hvec.capacity() / 4) { + hvec.shrink_to(num_nodes * 2); + } + } + + bool insert(handle_t & hnd, P pval = P()) noexcept + { + hnd.heap_index = hvec.size(); + hvec.emplace_back(&hnd, pval); + return bubble_down(); + } + + // Get the root node handle. (Returns a handle_t or reference to handle_t). + handle_t & get_root() + { + return * hvec[0].hnd_p; + } + + P &get_root_priority() + { + return hvec[0].data; + } + + void pull_root() + { + remove_h(0); + } + + void remove(handle_t & hnd) + { + remove_h(hnd.heap_index); + } + + bool empty() + { + return hvec.empty(); + } + + bool is_queued(handle_t hnd) + { + return hnd.heap_index != (hindex_t) -1; + } + + // Set a node priority. Returns true iff the node becomes the root node (and wasn't before). + bool set_priority(handle_t & hnd, const P& p) + { + int heap_index = hnd.heap_index; + + Compare lt; + if (lt(hvec[heap_index].data, p)) { + // Increase key + hvec[heap_index].data = p; + bubble_up(heap_index); + return false; + } + else { + // Decrease key + hvec[heap_index].data = p; + return bubble_down(heap_index); + } + } +}; + +} + +#endif diff --git a/src/dasynq/dasynq-childproc.h b/src/dasynq/dasynq-childproc.h index af33e26..722ebd8 100644 --- a/src/dasynq/dasynq-childproc.h +++ b/src/dasynq/dasynq-childproc.h @@ -79,7 +79,7 @@ class pid_map } }; -namespace { +inline namespace { void sigchld_handler(int signum) { // If SIGCHLD has no handler (is ignored), SIGCHLD signals will @@ -94,11 +94,12 @@ template class ChildProcEvents : public Base { private: pid_map child_waiters; - - using SigInfo = typename Base::SigInfo; protected: - void receiveSignal(SigInfo &siginfo, void *userdata) + using SigInfo = typename Base::SigInfo; + + template + bool receiveSignal(T & loop_mech, SigInfo &siginfo, void *userdata) { if (siginfo.get_signo() == SIGCHLD) { int status; @@ -109,9 +110,10 @@ template class ChildProcEvents : public Base Base::receiveChildStat(child, status, ent.second); } } + return false; // leave signal watch enabled } else { - Base::receiveSignal(siginfo, userdata); + return Base::receiveSignal(loop_mech, siginfo, userdata); } } diff --git a/src/dasynq/dasynq-epoll.h b/src/dasynq/dasynq-epoll.h index 763c40a..e92deb6 100644 --- a/src/dasynq/dasynq-epoll.h +++ b/src/dasynq/dasynq-epoll.h @@ -97,14 +97,12 @@ template class EpollLoop : public Base while (true) { int r = read(sigfd, &siginfo.info, sizeof(siginfo.info)); if (r == -1) break; - if (siginfo.get_signo() != SIGCHLD) { - // TODO remove the special exception for SIGCHLD? - sigdelset(&sigmask, siginfo.get_signo()); - } auto iter = sigdataMap.find(siginfo.get_signo()); if (iter != sigdataMap.end()) { void *userdata = (*iter).second; - Base::receiveSignal(siginfo, userdata); + if (Base::receiveSignal(*this, siginfo, userdata)) { + sigdelset(&sigmask, siginfo.get_signo()); + } } } signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); @@ -145,7 +143,7 @@ template class EpollLoop : public Base } } - // flags: IN_EVENTS | OUT_EVENTS + // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT void addFdWatch(int fd, void *userdata, int flags, bool enabled = true) { struct epoll_event epevent; diff --git a/src/dasynq/dasynq-kqueue.h b/src/dasynq/dasynq-kqueue.h index ad44708..4682f19 100644 --- a/src/dasynq/dasynq-kqueue.h +++ b/src/dasynq/dasynq-kqueue.h @@ -132,16 +132,13 @@ template class KqueueLoop : public Base timeout.tv_sec = 0; timeout.tv_nsec = 0; if (sigtimedwait(&sset, &siginfo.info, &timeout) > 0) { - Base::receiveSignal(siginfo, (void *)events[i].udata); - } - - if (events[i].ident != SIGCHLD) { - sigdelset(&sigmask, events[i].ident); - events[i].flags = EV_DISABLE; - } - else { - // TODO can we remove this SIGCHLD hack? - events[i].flags = EV_ENABLE; + if (Base::receiveSignal(*this, siginfo, (void *)events[i].udata)) { + sigdelset(&sigmask, events[i].ident); + events[i].flags = EV_DISABLE; + } + else { + events[i].flags = EV_ENABLE; + } } } else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) { @@ -348,7 +345,7 @@ template class KqueueLoop : public Base // rather than disabling each individually setFilterEnabled(EVFILT_SIGNAL, rsigno, false); } - Base::receiveSignal(siginfo, sigdataMap[rsigno]); + Base::receiveSignal(*this, siginfo, sigdataMap[rsigno]); rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout); } } diff --git a/src/dasynq/dasynq-mutex.h b/src/dasynq/dasynq-mutex.h index 212e6e2..9cf44b3 100644 --- a/src/dasynq/dasynq-mutex.h +++ b/src/dasynq/dasynq-mutex.h @@ -1,5 +1,5 @@ -#ifndef D_MUTEX_H_INCLUDED -#define D_MUTEX_H_INCLUDED +#ifndef DASYNQ_MUTEX_H_INCLUDED +#define DASYNQ_MUTEX_H_INCLUDED //#include #include @@ -40,7 +40,7 @@ using DMutex = std::mutex; // A "null" mutex, for which locking / unlocking actually does nothing. class NullMutex { - EMPTY_BODY + DASYNQ_EMPTY_BODY public: void lock() { } diff --git a/src/dasynq/dasynq-svec.h b/src/dasynq/dasynq-svec.h new file mode 100644 index 0000000..eb4259e --- /dev/null +++ b/src/dasynq/dasynq-svec.h @@ -0,0 +1,163 @@ +#ifndef DASYNQ_SVEC_H_INCLUDED +#define DASYNC_SVEC_H_INCLUDED + +#include +#include +#include + +// Vector with possibility to shrink capacity arbitrarily + +namespace dasynq { + +template +class svector +{ + private: + T * array; + size_t size_v; + size_t capacity_v; + + void check_capacity() + { + if (size_v == capacity_v) { + // double capacity now: + if (capacity_v == 0) capacity_v = 1; + T * new_array = (T *) std::malloc(capacity_v * 2 * sizeof(T)); + if (new_array == nullptr) { + throw std::bad_alloc(); + } + for (size_t i = 0; i < size_v; i++) { + new (&new_array[i]) T(std::move(array[i])); + array[i].T::~T(); + } + std::free(array); + array = new_array; + capacity_v *= 2; + } + } + + public: + using size_type = size_t; + + svector() : array(nullptr), size_v(0), capacity_v(0) + { + + } + + svector(const svector &other) + { + capacity_v = other.size_v; + size_v = other.size_v; + array = (T *) std::malloc(capacity_v * sizeof(T)); + if (array == nullptr) { + throw std::bad_alloc(); + } + for (size_t i = 0; i < size_v; i++) { + new (&array[i]) T(other[i]); + } + } + + ~svector() + { + for (size_t i = 0; i < size_v; i++) { + array[i].T::~T(); + } + std::free(array); + } + + void push_back(const T &t) + { + check_capacity(); + new (&array[size_v]) T(t); + size_v++; + } + + void push_back(T &&t) + { + check_capacity(); + new (&array[size_v]) T(t); + size_v++; + } + + template + void emplace_back(U... args) + { + check_capacity(); + new (&array[size_v]) T(args...); + size_v++; + } + + void pop_back() + { + size_v--; + } + + T &operator[](size_t index) + { + return array[index]; + } + + const T &operator[](size_t index) const + { + return array[index]; + } + + size_t size() const + { + return size_v; + } + + size_t capacity() const + { + return capacity_v; + } + + bool empty() const + { + return size_v == 0; + } + + void reserve(size_t amount) + { + if (capacity_v < amount) { + T * new_array = (T *) std::malloc(amount * sizeof(T)); + if (new_array == nullptr) { + throw std::bad_alloc(); + } + for (size_t i = 0; i < size_v; i++) { + new (&new_array[i]) T(std::move(array[i])); + array[i].T::~T(); + } + std::free(array); + array = new_array; + capacity_v = amount; + } + } + + void shrink_to(size_t amount) + { + if (capacity_v > amount) { + T * new_array = (T *) std::malloc(amount * sizeof(T)); + if (new_array == nullptr) { + return; + } + for (size_t i = 0; i < size_v; i++) { + new (&new_array[i]) T(std::move(array[i])); + array[i].T::~T(); + } + std::free(array); + array = new_array; + capacity_v = amount; + } + } + + T &back() + { + return array[size_v - 1]; + } +}; + + +} // namespace + +#endif diff --git a/src/dasynq/dasynq-timerfd.h b/src/dasynq/dasynq-timerfd.h new file mode 100644 index 0000000..db597b8 --- /dev/null +++ b/src/dasynq/dasynq-timerfd.h @@ -0,0 +1,236 @@ +#include +#include + +#include +#include + +#include "dasynq-binaryheap.h" + +namespace dasynq { + +// We could use one timerfd per timer, but then we need to differentiate timer +// descriptors from regular file descriptors when events are reported by the loop +// mechanism so that we can correctly report a timer event or fd event. + +// With a file descriptor or signal, we can use the item itself as the identifier for +// adding/removing watches. For timers, it's more complicated. When we add a timer, +// we are given a handle; we need to use this to modify the watch. We delegate the +// process of allocating a handle to a priority heap implementation (BinaryHeap). + + +class TimerData +{ + public: + // initial time? + struct timespec interval_time; // interval (if 0, one-off timer) + int expiry_count; // number of times expired + bool enabled; // whether timer reports events + void *userdata; + + TimerData(void *udata = nullptr) : interval_time({0,0}), expiry_count(0), enabled(true), userdata(udata) + { + // constructor + } +}; + +class CompareTimespec +{ + public: + bool operator()(const struct timespec &a, const struct timespec &b) + { + if (a.tv_sec < b.tv_sec) { + return true; + } + + if (a.tv_sec == b.tv_sec) { + return a.tv_nsec < b.tv_nsec; + } + + return false; + } +}; + +using timer_handle_t = BinaryHeap::handle_t; + +static void init_timer_handle(timer_handle_t &hnd) noexcept +{ + BinaryHeap::init_handle(hnd); +} + + +template class TimerFdEvents : public Base +{ + private: + int timerfd_fd = -1; + + BinaryHeap timer_queue; + + + static int divide_timespec(const struct timespec &num, const struct timespec &den) + { + // TODO + return 0; + } + + // Set the timerfd timeout to match the first timer in the queue (disable the timerfd + // if there are no active timers). + void set_timer_from_queue() + { + struct itimerspec newtime; + if (timer_queue.empty()) { + newtime.it_value = {0, 0}; + newtime.it_interval = {0, 0}; + } + else { + newtime.it_value = timer_queue.get_root_priority(); + newtime.it_interval = {0, 0}; + } + timerfd_settime(timerfd_fd, TFD_TIMER_ABSTIME, &newtime, nullptr); + } + + public: + template + void receiveFdEvent(T &loop_mech, typename Base::FD_r fd_r, void * userdata, int flags) + { + if (userdata == &timerfd_fd) { + struct timespec curtime; + clock_gettime(CLOCK_MONOTONIC, &curtime); // in theory, can't fail on Linux + + // Peek timer queue; calculate difference between current time and timeout + struct timespec * timeout = &timer_queue.get_root_priority(); + while (timeout->tv_sec < curtime.tv_sec || (timeout->tv_sec == curtime.tv_sec && + timeout->tv_nsec <= curtime.tv_nsec)) { + // Increment expiry count + timer_queue.node_data(timer_queue.get_root()).expiry_count++; + // (a periodic timer may have overrun; calculated below). + + auto thandle = timer_queue.get_root(); + TimerData &data = timer_queue.node_data(thandle); + timespec &interval = data.interval_time; + if (interval.tv_sec == 0 && interval.tv_nsec == 0) { + // Non periodic timer + timer_queue.pull_root(); + if (data.enabled) { + int expiry_count = data.expiry_count; + data.expiry_count = 0; + Base::receiveTimerExpiry(thandle, data.userdata, expiry_count); + } + if (timer_queue.empty()) { + break; + } + } + else { + // Periodic timer TODO + // First calculate the overrun in time: + /* + struct timespec diff; + diff.tv_sec = curtime.tv_sec - timeout->tv_sec; + diff.tv_nsec = curtime.tv_nsec - timeout->tv_nsec; + if (diff.tv_nsec < 0) { + diff.tv_nsec += 1000000000; + diff.tv_sec--; + } + */ + // Now we have to divide the time overrun by the period to find the + // interval overrun. This requires a division of a value not representable + // as a long... + // TODO use divide_timespec + // TODO better not to remove from queue maybe, but instead mark as inactive, + // adjust timeout, and bubble into correct position + // call Base::receieveTimerEvent + // TODO + } + + // repeat until all expired timeouts processed + // timeout = &timer_queue[0].timeout; + // (shouldn't be necessary; address hasn't changed...) + } + // arm timerfd with timeout from head of queue + set_timer_from_queue(); + } + else { + Base::receiveFdEvent(loop_mech, fd_r, userdata, flags); + } + } + + template void init(T *loop_mech) + { + timerfd_fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK); + if (timerfd_fd == -1) { + throw std::system_error(errno, std::system_category()); + } + loop_mech->addFdWatch(timerfd_fd, &timerfd_fd, IN_EVENTS); + Base::init(loop_mech); + } + + // Add timer, return handle (TODO: clock id param?) + void addTimer(timer_handle_t &h, void *userdata) + { + timer_queue.allocate(h, userdata); + } + + void removeTimer(timer_handle_t &timer_id) noexcept + { + removeTimer_nolock(timer_id); + } + + void removeTimer_nolock(timer_handle_t &timer_id) noexcept + { + if (timer_queue.is_queued(timer_id)) { + timer_queue.remove(timer_id); + } + timer_queue.deallocate(timer_id); + } + + // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0. + // enable: specifies whether to enable reporting of timeouts/intervals + void setTimer(timer_handle_t & timer_id, struct timespec &timeout, struct timespec &interval, bool enable) noexcept + { + auto &ts = timer_queue.node_data(timer_id); + ts.interval_time = interval; + ts.expiry_count = 0; + ts.enabled = enable; + + if (timer_queue.is_queued(timer_id)) { + // Already queued; alter timeout + if (timer_queue.set_priority(timer_id, timeout)) { + set_timer_from_queue(); + } + } + else { + if (timer_queue.insert(timer_id, timeout)) { + set_timer_from_queue(); + } + } + + // TODO locking (here and everywhere) + } + + // Set timer relative to current time: + void setTimerRel(timer_handle_t & timer_id, struct timespec &timeout, struct timespec &interval, bool enable) noexcept + { + // TODO consider caching current time somehow; need to decide then when to update cached value. + struct timespec curtime; + clock_gettime(CLOCK_MONOTONIC, &curtime); + curtime.tv_sec += timeout.tv_sec; + curtime.tv_nsec += timeout.tv_nsec; + if (curtime.tv_nsec > 1000000000) { + curtime.tv_nsec -= 1000000000; + curtime.tv_sec++; + } + setTimer(timer_id, curtime, interval, enable); + } + + // Enables or disabling report of timeouts (does not stop timer) + void enableTimer(timer_handle_t & timer_id, bool enable) noexcept + { + enableTimer_nolock(timer_id, enable); + } + + void enableTimer_nolock(timer_handle_t & timer_id, bool enable) noexcept + { + timer_queue.node_data(timer_id).enabled = enable; + } +}; + +} diff --git a/src/dasynq/dasynq.h b/src/dasynq/dasynq.h index 6dc785a..560a44e 100644 --- a/src/dasynq/dasynq.h +++ b/src/dasynq/dasynq.h @@ -1,31 +1,37 @@ -#ifndef DASYNC_H_INCLUDED -#define DASYNC_H_INCLUDED +#ifndef DASYNQ_H_INCLUDED +#define DASYNQ_H_INCLUDED #if defined(__OpenBSD__) -#define HAVE_KQUEUE 1 +#define DASYNQ_HAVE_KQUEUE 1 #endif #if defined(__linux__) -#define HAVE_EPOLL 1 +#define DASYNQ_HAVE_EPOLL 1 #endif #include "dasynq-flags.h" +#include "dasynq-binaryheap.h" -#if defined(HAVE_KQUEUE) +#if defined(DASYNQ_CUSTOM_LOOP_IMPLEMENTATION) +// Loop and LoopTraits defined already; used for testing +#elif defined(DASYNQ_HAVE_KQUEUE) #include "dasynq-kqueue.h" +#include "dasynq-itimer.h" #include "dasynq-childproc.h" namespace dasynq { - template using Loop = KqueueLoop; + template using Loop = KqueueLoop>>; using LoopTraits = KqueueTraits; } -#elif defined(HAVE_EPOLL) +#elif defined(DASYNQ_HAVE_EPOLL) #include "dasynq-epoll.h" +#include "dasynq-timerfd.h" #include "dasynq-childproc.h" namespace dasynq { - template using Loop = EpollLoop; + template using Loop = EpollLoop>>; using LoopTraits = EpollTraits; } #endif + #include #include #include @@ -44,9 +50,9 @@ namespace dasynq { #ifdef __GNUC__ #ifndef __clang__ -#define EMPTY_BODY char empty[0]; // Make class instances take up no space (gcc) +#define DASYNQ_EMPTY_BODY char empty[0]; // Make class instances take up no space (gcc) #else -#define EMPTY_BODY char empty[0] __attribute__((unused)); // Make class instances take up no space (clang) +#define DASYNQ_EMPTY_BODY char empty[0] __attribute__((unused)); // Make class instances take up no space (clang) #endif #endif @@ -54,11 +60,20 @@ namespace dasynq { namespace dasynq { +namespace dprivate { + class BaseWatcher; +} + +using PrioQueue = BinaryHeap; + +inline namespace { + constexpr int DEFAULT_PRIORITY = 50; +} /** * Values for rearm/disarm return from event handlers */ -enum class Rearm +enum class rearm { /** Re-arm the event watcher so that it receives further events */ REARM, @@ -73,28 +88,37 @@ enum class Rearm // TODO: add a REQUEUE option, which means, "I didn't complete input/output, run me again soon" }; +// Different timer clock types +enum class ClockType +{ + WALLTIME, + MONOTONIC +}; + // Information about a received signal. // This is essentially a wrapper for the POSIX siginfo_t; its existence allows for mechanisms that receive // equivalent signal information in a different format (eg signalfd on Linux). using SigInfo = LoopTraits::SigInfo; // Forward declarations: -template class EventLoop; +template class event_loop; namespace dprivate { // (non-public API) - template class FdWatcher; - template class BidiFdWatcher; - template class SignalWatcher; - template class ChildProcWatcher; + template class FdWatcher; + template class BidiFdWatcher; + template class SignalWatcher; + template class ChildProcWatcher; + template class Timer; enum class WatchType { SIGNAL, FD, CHILD, - SECONDARYFD + SECONDARYFD, + TIMER }; template class EventDispatch; @@ -108,15 +132,20 @@ namespace dprivate { class BaseWatcher { template friend class EventDispatch; - template friend class dasynq::EventLoop; + template friend class dasynq::event_loop; protected: WatchType watchType; - int active : 1; - int deleteme : 1; + int active : 1; // currently executing handler? + int deleteme : 1; // delete when handler finished? - BaseWatcher * prev; - BaseWatcher * next; + PrioQueue::handle_t heap_handle; + int priority; + + static void set_priority(BaseWatcher &p, int prio) + { + p.priority = prio; + } public: @@ -125,8 +154,8 @@ namespace dprivate { { active = false; deleteme = false; - prev = nullptr; - next = nullptr; + PrioQueue::init_handle(heap_handle); + priority = DEFAULT_PRIORITY; } BaseWatcher(WatchType wt) noexcept : watchType(wt) { } @@ -149,7 +178,7 @@ namespace dprivate { class BaseSignalWatcher : public BaseWatcher { template friend class EventDispatch; - friend class dasynq::EventLoop; + friend class dasynq::event_loop; protected: SigInfo siginfo; @@ -158,14 +187,14 @@ namespace dprivate { public: typedef SigInfo &SigInfo_p; - virtual Rearm received(EventLoop &eloop, int signo, SigInfo_p siginfo) = 0; + virtual rearm received(event_loop &eloop, int signo, SigInfo_p siginfo) = 0; }; template class BaseFdWatcher : public BaseWatcher { template friend class EventDispatch; - friend class dasynq::EventLoop; + friend class dasynq::event_loop; protected: int watch_fd; @@ -177,19 +206,19 @@ namespace dprivate { BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { } public: - virtual Rearm fdEvent(EventLoop &eloop, int fd, int flags) = 0; + virtual rearm fdEvent(event_loop &eloop, int fd, int flags) = 0; }; template class BaseBidiFdWatcher : public BaseFdWatcher { template friend class EventDispatch; - friend class dasynq::EventLoop; + friend class dasynq::event_loop; // This should never actually get called: - Rearm fdEvent(EventLoop &eloop, int fd, int flags) final + rearm fdEvent(event_loop &eloop, int fd, int flags) final { - return Rearm::REARM; // should not be reachable. + return rearm::REARM; // should not be reachable. }; protected: @@ -202,15 +231,15 @@ namespace dprivate { int write_removed : 1; // write watch removed? public: - virtual Rearm readReady(EventLoop &eloop, int fd) noexcept = 0; - virtual Rearm writeReady(EventLoop &eloop, int fd) noexcept = 0; + virtual rearm readReady(event_loop &eloop, int fd) noexcept = 0; + virtual rearm writeReady(event_loop &eloop, int fd) noexcept = 0; }; template class BaseChildWatcher : public BaseWatcher { template friend class EventDispatch; - friend class dasynq::EventLoop; + friend class dasynq::event_loop; protected: pid_t watch_pid; @@ -219,7 +248,30 @@ namespace dprivate { BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { } public: - virtual Rearm childStatus(EventLoop &eloop, pid_t child, int status) = 0; + virtual rearm childStatus(event_loop &eloop, pid_t child, int status) = 0; + }; + + + template + class BaseTimerWatcher : public BaseWatcher + { + template friend class EventDispatch; + friend class dasynq::event_loop; + + protected: + timer_handle_t timer_handle; + int intervals; + + BaseTimerWatcher() : BaseWatcher(WatchType::TIMER) + { + init_timer_handle(timer_handle); + } + + public: + // Timer expired, and the given number of intervals have elapsed before + // expiry evenet was queued. Normally intervals == 1 to indicate no + // overrun. + virtual rearm timerExpiry(event_loop &eloop, int intervals) = 0; }; // Classes for implementing a fair(ish) wait queue. @@ -249,13 +301,13 @@ namespace dprivate { template <> class waitqueue_node { // Specialised waitqueue_node for NullMutex. - // TODO can this be reduced to 0 data members? friend class waitqueue; - waitqueue_node * next = nullptr; public: void wait(std::unique_lock &ul) { } void signal() { } + + DASYNQ_EMPTY_BODY; }; template class waitqueue_node @@ -276,6 +328,34 @@ namespace dprivate { } }; + template <> class waitqueue + { + public: + waitqueue_node * unqueue() + { + return nullptr; + } + + waitqueue_node * getHead() + { + return nullptr; + } + + bool checkHead(waitqueue_node &node) + { + return true; + } + + bool isEmpty() + { + return true; + } + + void queue(waitqueue_node *node) + { + } + }; + template class waitqueue { waitqueue_node * tail = nullptr; @@ -293,6 +373,16 @@ namespace dprivate { return head; } + bool checkHead(waitqueue_node &node) + { + return head == &node; + } + + bool isEmpty() + { + return head == nullptr; + } + void queue(waitqueue_node *node) { if (tail) { @@ -303,68 +393,65 @@ namespace dprivate { } } }; - + // This class serves as the base class (mixin) for the AEN mechanism class. // // The EventDispatch class maintains the queued event data structures. It inserts watchers // into the queue when eventes are received (receiveXXX methods). template class EventDispatch : public Traits { - friend class EventLoop; + friend class event_loop; // queue data structure/pointer - BaseWatcher * first = nullptr; + PrioQueue event_queue; using BaseSignalWatcher = dasynq::dprivate::BaseSignalWatcher; using BaseFdWatcher = dasynq::dprivate::BaseFdWatcher; using BaseBidiFdWatcher = dasynq::dprivate::BaseBidiFdWatcher; using BaseChildWatcher = dasynq::dprivate::BaseChildWatcher; + using BaseTimerWatcher = dasynq::dprivate::BaseTimerWatcher; - void queueWatcher(BaseWatcher *bwatcher) + // Add a watcher into the queuing system (but don't queue it) + // may throw: std::bad_alloc + void prepare_watcher(BaseWatcher *bwatcher) { - // Put in queue: - if (first == nullptr) { - bwatcher->prev = bwatcher; - bwatcher->next = bwatcher; - first = bwatcher; - } - else { - first->prev->next = bwatcher; - bwatcher->prev = first->prev; - first->prev = bwatcher; - bwatcher->next = first; - } + event_queue.allocate(bwatcher->heap_handle, bwatcher); + } + + void queueWatcher(BaseWatcher *bwatcher) noexcept + { + event_queue.insert(bwatcher->heap_handle, bwatcher->priority); } bool isQueued(BaseWatcher *bwatcher) { - return bwatcher->prev != nullptr; + return event_queue.is_queued(bwatcher->heap_handle); } void dequeueWatcher(BaseWatcher *bwatcher) { - if (bwatcher->prev == bwatcher) { - // Only item in queue - first = nullptr; + if (event_queue.is_queued(bwatcher->heap_handle)) { + event_queue.remove(bwatcher->heap_handle); } - else { - if (first == bwatcher) first = first->next; - bwatcher->prev->next = bwatcher->next; - bwatcher->next->prev = bwatcher->prev; - } - - bwatcher->prev = nullptr; - bwatcher->next = nullptr; + } + + // Remove watcher from the queueing system + void release_watcher(BaseWatcher *bwatcher) + { + event_queue.deallocate(bwatcher->heap_handle); } protected: T_Mutex lock; - void receiveSignal(typename Traits::SigInfo & siginfo, void * userdata) + // Receive a signal; return true to disable signal watch or false to leave enabled + template + bool receiveSignal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata) { BaseSignalWatcher * bwatcher = static_cast(userdata); bwatcher->siginfo = siginfo; queueWatcher(bwatcher); + return true; } template @@ -411,13 +498,23 @@ namespace dprivate { queueWatcher(watcher); } + void receiveTimerExpiry(timer_handle_t & timer_handle, void * userdata, int intervals) + { + BaseTimerWatcher * watcher = static_cast(userdata); + watcher->intervals = intervals; + queueWatcher(watcher); + } + // Pull a single event from the queue BaseWatcher * pullEvent() { - BaseWatcher * r = first; - if (r != nullptr) { - dequeueWatcher(r); + if (event_queue.empty()) { + return nullptr; } + + auto rhndl = event_queue.get_root(); + BaseWatcher *r = event_queue.node_data(rhndl); + event_queue.pull_root(); return r; } @@ -433,13 +530,13 @@ namespace dprivate { // If the watcher is active, set deleteme true; the watcher will be removed // at the end of current processing (i.e. when active is set false). watcher->deleteme = true; + release_watcher(watcher); lock.unlock(); } else { // Actually do the delete. - if (isQueued(watcher)) { - dequeueWatcher(watcher); - } + dequeueWatcher(watcher); + release_watcher(watcher); lock.unlock(); watcher->watchRemoved(); @@ -452,24 +549,22 @@ namespace dprivate { if (watcher->active) { watcher->deleteme = true; + release_watcher(watcher); } else { - if (isQueued(watcher)) { - dequeueWatcher(watcher); - } - + dequeueWatcher(watcher); + release_watcher(watcher); watcher->read_removed = true; } BaseWatcher *secondary = &(watcher->outWatcher); if (secondary->active) { secondary->deleteme = true; + release_watcher(watcher); } else { - if (isQueued(secondary)) { - dequeueWatcher(secondary); - } - + dequeueWatcher(secondary); + release_watcher(watcher); watcher->write_removed = true; } @@ -485,12 +580,13 @@ namespace dprivate { } -template class EventLoop +template class event_loop { - friend class dprivate::FdWatcher>; - friend class dprivate::BidiFdWatcher>; - friend class dprivate::SignalWatcher>; - friend class dprivate::ChildProcWatcher>; + friend class dprivate::FdWatcher>; + friend class dprivate::BidiFdWatcher>; + friend class dprivate::SignalWatcher>; + friend class dprivate::ChildProcWatcher>; + friend class dprivate::Timer>; public: using LoopTraits = dasynq::LoopTraits; @@ -504,9 +600,10 @@ template class EventLoop using BaseFdWatcher = dprivate::BaseFdWatcher; using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher; using BaseChildWatcher = dprivate::BaseChildWatcher; + using BaseTimerWatcher = dprivate::BaseTimerWatcher; using WatchType = dprivate::WatchType; - Loop>> loop_mech; + Loop> loop_mech; // There is a complex problem with most asynchronous event notification mechanisms // when used in a multi-threaded environment. Generally, a file descriptor or other @@ -563,7 +660,14 @@ template class EventLoop void registerSignal(BaseSignalWatcher *callBack, int signo) { - loop_mech.addSignalWatch(signo, callBack); + loop_mech.prepare_watcher(callBack); + try { + loop_mech.addSignalWatch(signo, callBack); + } + catch (...) { + loop_mech.release_watcher(callBack); + throw; + } } void deregister(BaseSignalWatcher *callBack, int signo) noexcept @@ -581,16 +685,37 @@ template class EventLoop void registerFd(BaseFdWatcher *callback, int fd, int eventmask, bool enabled) { - loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled); + loop_mech.prepare_watcher(callback); + try { + loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled); + } + catch (...) { + loop_mech.release_watcher(callback); + throw; + } } void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask) { - if (LoopTraits::has_separate_rw_fd_watches) { - loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT); + loop_mech.prepare_watcher(callback); + try { + loop_mech.prepare_watcher(&callback->outWatcher); + try { + if (LoopTraits::has_separate_rw_fd_watches) { + loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT); + } + else { + loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT); + } + } + catch (...) { + loop_mech.release_watcher(&callback->outWatcher); + throw; + } } - else { - loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT); + catch (...) { + loop_mech.release_watcher(callback); + throw; } } @@ -645,19 +770,34 @@ template class EventLoop releaseLock(qnode); } - void reserveChildWatch(BaseChildWatcher *callBack) + void reserveChildWatch(BaseChildWatcher *callback) { - loop_mech.reserveChildWatch(); + loop_mech.prepare_watcher(callback); + try { + loop_mech.reserveChildWatch(); + } + catch (...) { + loop_mech.release_watcher(callback); + throw; + } } - void unreserve(BaseChildWatcher *callBack) + void unreserve(BaseChildWatcher *callback) { loop_mech.unreserveChildWatch(); + loop_mech.release_watcher(callback); } - void registerChild(BaseChildWatcher *callBack, pid_t child) + void registerChild(BaseChildWatcher *callback, pid_t child) { - loop_mech.addChildWatch(child, callBack); + loop_mech.prepare_watcher(callback); + try { + loop_mech.addChildWatch(child, callback); + } + catch (...) { + loop_mech.release_watcher(callback); + throw; + } } void registerReservedChild(BaseChildWatcher *callBack, pid_t child) noexcept @@ -683,11 +823,55 @@ template class EventLoop releaseLock(qnode); } - void dequeueWatcher(BaseWatcher *watcher) noexcept + void registerTimer(BaseTimerWatcher *callback) { - if (loop_mech.isQueued(watcher)) { - loop_mech.dequeueWatcher(watcher); + loop_mech.prepare_watcher(callback); + try { + loop_mech.addTimer(callback->timer_handle, callback); } + catch (...) { + loop_mech.release_watcher(callback); + } + } + + void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout) + { + struct timespec interval {0, 0}; + loop_mech.setTimer(callBack->timer_handle, timeout, interval, true); + } + + void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval) + { + loop_mech.setTimer(callBack->timer_handle, timeout, interval, true); + } + + void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout) + { + struct timespec interval {0, 0}; + loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true); + } + + void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval) + { + loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true); + } + + void deregister(BaseTimerWatcher *callback) + { + loop_mech.removeTimer(callback->timer_handle); + + waitqueue_node qnode; + getAttnLock(qnode); + + EventDispatch & ed = (EventDispatch &) loop_mech; + ed.issueDelete(callback); + + releaseLock(qnode); + } + + void dequeueWatcher(BaseWatcher *watcher) noexcept + { + loop_mech.dequeueWatcher(watcher); } // Acquire the attention lock (when held, ensures that no thread is polling the AEN @@ -696,9 +880,9 @@ template class EventLoop { std::unique_lock ulock(wait_lock); attn_waitqueue.queue(&qnode); - if (attn_waitqueue.getHead() != &qnode) { + if (! attn_waitqueue.checkHead(qnode)) { loop_mech.interruptWait(); - while (attn_waitqueue.getHead() != &qnode) { + while (! attn_waitqueue.checkHead(qnode)) { qnode.wait(ulock); } } @@ -709,7 +893,7 @@ template class EventLoop void getPollwaitLock(waitqueue_node &qnode) { std::unique_lock ulock(wait_lock); - if (attn_waitqueue.getHead() == nullptr) { + if (attn_waitqueue.isEmpty()) { // Queue is completely empty: attn_waitqueue.queue(&qnode); } @@ -717,7 +901,7 @@ template class EventLoop wait_waitqueue.queue(&qnode); } - while (attn_waitqueue.getHead() != &qnode) { + while (! attn_waitqueue.checkHead(qnode)) { qnode.wait(ulock); } } @@ -739,18 +923,18 @@ template class EventLoop } } - void processSignalRearm(BaseSignalWatcher * bsw, Rearm rearmType) + void processSignalRearm(BaseSignalWatcher * bsw, rearm rearmType) { // Called with lock held - if (rearmType == Rearm::REARM) { + if (rearmType == rearm::REARM) { loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo()); } - else if (rearmType == Rearm::REMOVE) { + else if (rearmType == rearm::REMOVE) { loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo()); } } - Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch) + rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch) { // Called with lock held; // bdfw->event_flags contains only with pending (queued) events @@ -758,28 +942,28 @@ template class EventLoop if (is_multi_watch) { BaseBidiFdWatcher * bdfw = static_cast(bfw); - if (rearmType == Rearm::REMOVE) { + if (rearmType == rearm::REMOVE) { bdfw->read_removed = 1; bdfw->watch_flags &= ~IN_EVENTS; if (! LoopTraits::has_separate_rw_fd_watches) { if (! bdfw->write_removed) { - return Rearm::NOOP; + return rearm::NOOP; } else { // both removed: actually remove loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */); - return Rearm::REMOVE; + return rearm::REMOVE; } } else { loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS); } } - else if (rearmType == Rearm::DISARM) { + else if (rearmType == rearm::DISARM) { // Nothing more to do } - else if (rearmType == Rearm::REARM) { + else if (rearmType == rearm::REARM) { bdfw->watch_flags |= IN_EVENTS; if (! LoopTraits::has_separate_rw_fd_watches) { @@ -799,12 +983,12 @@ template class EventLoop } return rearmType; } - else { - if (rearmType == Rearm::REARM) { + else { // Not multi-watch: + if (rearmType == rearm::REARM) { loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw, (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT); } - else if (rearmType == Rearm::REMOVE) { + else if (rearmType == rearm::REMOVE) { loop_mech.removeFdWatch_nolock(bfw->watch_fd, bfw->watch_flags); } return rearmType; @@ -812,32 +996,32 @@ template class EventLoop } // Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher. - Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType) + rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, rearm rearmType) { // Called with lock held - if (rearmType == Rearm::REMOVE) { + if (rearmType == rearm::REMOVE) { bdfw->write_removed = 1; bdfw->watch_flags &= ~OUT_EVENTS; if (LoopTraits::has_separate_rw_fd_watches) { loop_mech.removeFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS); - return bdfw->read_removed ? Rearm::REMOVE : Rearm::NOOP; + return bdfw->read_removed ? rearm::REMOVE : rearm::NOOP; } else { if (! bdfw->read_removed) { - return Rearm::NOOP; + return rearm::NOOP; } else { // both removed: actually remove loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */); - return Rearm::REMOVE; + return rearm::REMOVE; } } } - else if (rearmType == Rearm::DISARM) { + else if (rearmType == rearm::DISARM) { // Nothing more to do } - else if (rearmType == Rearm::REARM) { + else if (rearmType == rearm::REARM) { bdfw->watch_flags |= OUT_EVENTS; if (! LoopTraits::has_separate_rw_fd_watches) { @@ -857,6 +1041,17 @@ template class EventLoop } return rearmType; } + + void processTimerRearm(BaseTimerWatcher *btw, rearm rearmType) + { + // Called with lock held + if (rearmType == rearm::REARM) { + loop_mech.enableTimer_nolock(btw->timer_handle, true); + } + else if (rearmType == rearm::REMOVE) { + loop_mech.removeTimer_nolock(btw->timer_handle); + } + } bool processEvents() noexcept { @@ -874,7 +1069,7 @@ template class EventLoop pqueue->active = true; active = true; - Rearm rearmType = Rearm::NOOP; + rearm rearmType = rearm::NOOP; bool is_multi_watch = false; BaseBidiFdWatcher *bbfw = nullptr; @@ -939,18 +1134,23 @@ template class EventLoop bbfw->event_flags &= ~OUT_EVENTS; break; } + case WatchType::TIMER: { + BaseTimerWatcher *btw = static_cast(pqueue); + rearmType = btw->timerExpiry(*this, btw->intervals); + break; + } default: ; } ed.lock.lock(); // (if REMOVED, we must not touch pqueue at all) - if (rearmType != Rearm::REMOVED) { + if (rearmType != rearm::REMOVED) { pqueue->active = false; if (pqueue->deleteme) { // We don't want a watch that is marked "deleteme" to re-arm itself. - rearmType = Rearm::REMOVE; + rearmType = rearm::REMOVE; } switch (pqueue->watchType) { case WatchType::SIGNAL: @@ -962,11 +1162,17 @@ template class EventLoop case WatchType::SECONDARYFD: rearmType = processSecondaryRearm(bbfw, rearmType); break; + case WatchType::TIMER: + processTimerRearm(static_cast(pqueue), rearmType); + break; default: ; } - if (pqueue->deleteme || rearmType == Rearm::REMOVE) { + if (rearmType == rearm::REMOVE) { ed.lock.unlock(); + // Note that for BidiFd watches, watchRemoved is only called on the primary watch. + // The process function called above only returns Rearm::REMOVE if both primary and + // secondary watches have been removed. (is_multi_watch ? bbfw : pqueue)->watchRemoved(); ed.lock.lock(); } @@ -983,10 +1189,11 @@ template class EventLoop public: using mutex_t = T_Mutex; - using FdWatcher = dprivate::FdWatcher>; - using BidiFdWatcher = dprivate::BidiFdWatcher>; - using SignalWatcher = dprivate::SignalWatcher>; - using ChildProcWatcher = dprivate::ChildProcWatcher>; + using FdWatcher = dprivate::FdWatcher>; + using BidiFdWatcher = dprivate::BidiFdWatcher>; + using SignalWatcher = dprivate::SignalWatcher>; + using ChildProcWatcher = dprivate::ChildProcWatcher>; + using Timer = dprivate::Timer>; // using LoopTraits = dasynq::LoopTraits; @@ -1008,8 +1215,8 @@ template class EventLoop } }; -typedef EventLoop NEventLoop; -typedef EventLoop TEventLoop; +typedef event_loop NEventLoop; +typedef event_loop TEventLoop; // from dasync.cc: TEventLoop & getSystemLoop(); @@ -1030,9 +1237,10 @@ public: // If an attempt is made to register with more than one event loop at // a time, behaviour is undefined. The signal should be masked before // call. - inline void addWatch(EventLoop &eloop, int signo) + inline void addWatch(EventLoop &eloop, int signo, int prio = DEFAULT_PRIORITY) { BaseWatcher::init(); + this->priority = prio; this->siginfo.set_signo(signo); eloop.registerSignal(this, signo); } @@ -1042,7 +1250,7 @@ public: eloop.deregister(this, this->siginfo.get_signo()); } - // virtual Rearm received(EventLoop &, int signo, SigInfo_p info) = 0; + // virtual rearm received(EventLoop &, int signo, SigInfo_p info) = 0; }; // Posix file descriptor event watcher @@ -1078,9 +1286,10 @@ class FdWatcher : private dprivate::BaseFdWatcher // causes undefined behavior. // // Can fail with std::bad_alloc or std::system_error. - void addWatch(EventLoop &eloop, int fd, int flags, bool enabled = true) + void addWatch(EventLoop &eloop, int fd, int flags, bool enabled = true, int prio = DEFAULT_PRIORITY) { BaseWatcher::init(); + this->priority = prio; this->watch_fd = fd; this->watch_flags = flags; eloop.registerFd(this, fd, flags, enabled); @@ -1098,7 +1307,7 @@ class FdWatcher : private dprivate::BaseFdWatcher // (which will not occur until the event handler returns, if it is active). // In a single threaded environment, it is safe to delete the watcher after // calling this method as long as the handler (if it is active) accesses no - // internal state and returns Rearm::REMOVED. + // internal state and returns rearm::REMOVED. void deregister(EventLoop &eloop) noexcept { eloop.deregister(this, this->watch_fd); @@ -1113,7 +1322,39 @@ class FdWatcher : private dprivate::BaseFdWatcher } } - // virtual Rearm fdEvent(EventLoop *, int fd, int flags) = 0; + // Add an Fd watch via a lambda. The watch is allocated dynamically and destroys + // itself when removed from the event loop. + template + static FdWatcher *addWatch(EventLoop &eloop, int fd, int flags, T watchHndlr) + { + class LambdaFdWatcher : public FdWatcher + { + private: + T watchHndlr; + + public: + LambdaFdWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a) + { + // + } + + rearm fdEvent(EventLoop &eloop, int fd, int flags) override + { + return watchHndlr(eloop, fd, flags); + } + + void watchRemoved() noexcept override + { + delete this; + } + }; + + LambdaFdWatcher * lfd = new LambdaFdWatcher(watchHndlr); + lfd->addWatch(eloop, fd, flags); + return lfd; + } + + // virtual rearm fdEvent(EventLoop &, int fd, int flags) = 0; }; // A Bi-directional file descriptor watcher with independent read- and write- channels. @@ -1196,7 +1437,7 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcheroutWatcher.BaseWatcher::init(); @@ -1204,6 +1445,8 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcherwatch_flags = flags | dprivate::multi_watch; this->read_removed = false; this->write_removed = false; + this->priority = inprio; + this->set_priority(this->outWatcher, outprio); eloop.registerFd(this, fd, flags); } @@ -1219,14 +1462,14 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcherwatch_fd); } - // Rearm readReady(EventLoop * eloop, int fd) noexcept - // Rearm writeReady(EventLoop * eloop, int fd) noexcept + // rearm readReady(EventLoop * eloop, int fd) noexcept + // rearm writeReady(EventLoop * eloop, int fd) noexcept }; // Child process event watcher @@ -1254,10 +1497,11 @@ class ChildProcWatcher : private dprivate::BaseChildWatcherwatch_pid = child; + this->priority = prio; eloop.registerChild(this, child); } @@ -1265,10 +1509,13 @@ class ChildProcWatcher : private dprivate::BaseChildWatcherwatch_pid = child; + this->priority = prio; eloop.registerReservedChild(this, child); } @@ -1360,7 +1607,46 @@ class ChildProcWatcher : private dprivate::BaseChildWatcher +class Timer : private BaseTimerWatcher +{ + public: + + // Allocate a timer (using the MONOTONIC clock) + void addTimer(EventLoop &eloop, int prio = DEFAULT_PRIORITY) + { + this->priority = prio; + eloop.registerTimer(this); + } + + void armTimer(EventLoop &eloop, struct timespec &timeout) noexcept + { + eloop.setTimer(this, timeout); + } + + void armTimer(EventLoop &eloop, struct timespec &timeout, struct timespec &interval) noexcept + { + eloop.setTimer(this, timeout, interval); + } + + // Arm timer, relative to now: + void armTimerRel(EventLoop &eloop, struct timespec &timeout) noexcept + { + eloop.setTimerRel(this, timeout); + } + + void armTimerRel(EventLoop &eloop, struct timespec &timeout, struct timespec &interval) noexcept + { + eloop.setTimerRel(this, timeout, interval); + } + + void deregister(EventLoop &eloop) noexcept + { + eloop.deregister(this); + } }; } // namespace dasynq::dprivate diff --git a/src/dinit-log.cc b/src/dinit-log.cc index 77aeda7..af46516 100644 --- a/src/dinit-log.cc +++ b/src/dinit-log.cc @@ -50,7 +50,7 @@ class BufferedLogStream : public EventLoop_t::FdWatcher release = false; } - Rearm fdEvent(EventLoop_t &loop, int fd, int flags) noexcept override; + rearm fdEvent(EventLoop_t &loop, int fd, int flags) noexcept override; // Check whether the console can be released. void flushForRelease(); @@ -119,7 +119,7 @@ void BufferedLogStream::flushForRelease() // Try to flush any messages that are currently buffered. (Console is non-blocking // so it will fail gracefully). - if (fdEvent(eventLoop, fd, OUT_EVENTS) == Rearm::DISARM) { + if (fdEvent(eventLoop, fd, OUT_EVENTS) == rearm::DISARM) { // Console has already been released at this point. setEnabled(eventLoop, false); } @@ -127,7 +127,7 @@ void BufferedLogStream::flushForRelease() // release when it's finished. } -Rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept +rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept { if ((! partway) && (! special) && discarded) { special_buf = "dinit: *** message discarded due to full buffer ****\n"; @@ -147,25 +147,25 @@ Rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept if (release) { release_console(); - return Rearm::DISARM; + return rearm::DISARM; } } else { msg_index += r; - return Rearm::REARM; + return rearm::REARM; } } else if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) { - return Rearm::REMOVE; + return rearm::REMOVE; } - return Rearm::REARM; + return rearm::REARM; } else { // Writing from the regular circular buffer if (current_index == 0) { release_console(); - return Rearm::DISARM; + return rearm::DISARM; } // We try to find a complete line (terminated by '\n') in the buffer, and write it @@ -218,18 +218,18 @@ Rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept if (current_index == 0 || release) { // No more messages buffered / stop logging to console: release_console(); - return Rearm::DISARM; + return rearm::DISARM; } } } else if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) { - return Rearm::REMOVE; + return rearm::REMOVE; } } // We've written something by the time we get here. We could fall through to below, but // let's give other events a chance to be processed by returning now. - return Rearm::REARM; + return rearm::REARM; } // Initialise the logging subsystem diff --git a/src/dinit.cc b/src/dinit.cc index f9baf50..447fff4 100644 --- a/src/dinit.cc +++ b/src/dinit.cc @@ -37,7 +37,7 @@ using namespace dasynq; -using EventLoop_t = EventLoop; +using EventLoop_t = event_loop; EventLoop_t eventLoop = EventLoop_t(); @@ -54,10 +54,10 @@ void setup_external_log() noexcept; class ControlSocketWatcher : public EventLoop_t::FdWatcher { - Rearm fdEvent(EventLoop_t &loop, int fd, int flags) override + rearm fdEvent(EventLoop_t &loop, int fd, int flags) override { control_socket_cb(&loop, fd); - return Rearm::REARM; + return rearm::REARM; } }; @@ -119,19 +119,19 @@ namespace { this->cb_func = cb_func; } - Rearm received(EventLoop_t &eloop, int signo, SigInfo_p siginfo) override + rearm received(EventLoop_t &eloop, int signo, SigInfo_p siginfo) override { service_set->stop_all_services(ShutdownType::REBOOT); - return Rearm::REARM; + return rearm::REARM; } }; class ControlSocketWatcher : public EventLoop_t::FdWatcher { - Rearm fdEvent(EventLoop_t &loop, int fd, int flags) + rearm fdEvent(EventLoop_t &loop, int fd, int flags) { control_socket_cb(&loop, fd); - return Rearm::REARM; + return rearm::REARM; } }; } diff --git a/src/service.cc b/src/service.cc index a8ea823..bc0e8a7 100644 --- a/src/service.cc +++ b/src/service.cc @@ -110,7 +110,7 @@ void ServiceRecord::stopped() noexcept } } -dasynq::Rearm ServiceChildWatcher::childStatus(EventLoop_t &loop, pid_t child, int status) noexcept +dasynq::rearm ServiceChildWatcher::childStatus(EventLoop_t &loop, pid_t child, int status) noexcept { ServiceRecord *sr = service; @@ -125,14 +125,14 @@ dasynq::Rearm ServiceChildWatcher::childStatus(EventLoop_t &loop, pid_t child, i if (sr->waiting_for_execstat) { // We still don't have an exec() status from the forked child, wait for that // before doing any further processing. - return Rearm::REMOVE; + return rearm::REMOVE; } // Must deregister now since handle_exit_status might result in re-launch: deregister(loop, child); sr->handle_exit_status(); - return Rearm::REMOVED; + return rearm::REMOVED; } bool ServiceRecord::do_auto_restart() noexcept @@ -251,7 +251,7 @@ void ServiceRecord::handle_exit_status() noexcept } } -Rearm ServiceIoWatcher::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept +rearm ServiceIoWatcher::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept { ServiceRecord *sr = service; sr->waiting_for_execstat = false; @@ -293,7 +293,7 @@ Rearm ServiceIoWatcher::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept sr->service_set->processQueues(true); - return Rearm::REMOVED; + return rearm::REMOVED; } void ServiceRecord::require() noexcept diff --git a/src/service.h b/src/service.h index 179799f..fa9b16a 100644 --- a/src/service.h +++ b/src/service.h @@ -186,7 +186,7 @@ class ServiceChildWatcher : public EventLoop_t::ChildProcWatcher { public: ServiceRecord * service; - Rearm childStatus(EventLoop_t &eloop, pid_t child, int status) noexcept; + rearm childStatus(EventLoop_t &eloop, pid_t child, int status) noexcept; ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { } }; @@ -195,7 +195,7 @@ class ServiceIoWatcher : public EventLoop_t::FdWatcher { public: ServiceRecord * service; - Rearm fdEvent(EventLoop_t &eloop, int fd, int flags) noexcept; + rearm fdEvent(EventLoop_t &eloop, int fd, int flags) noexcept; ServiceIoWatcher(ServiceRecord * sr) noexcept : service(sr) { } }; -- 2.25.1