From: Davin McCall Date: Mon, 27 Mar 2017 21:24:13 +0000 (+0100) Subject: Update Dasynq library and API usage X-Git-Tag: v0.05~4 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=6695d7ae18e7e7d11335d4b7fdb9bfdfcb807be1;p=oweals%2Fdinit.git Update Dasynq library and API usage Hopefully for the last time, as the Dasynq API is now stable. --- diff --git a/src/control.h b/src/control.h index e3b754a..3b746f8 100644 --- a/src/control.h +++ b/src/control.h @@ -19,7 +19,7 @@ // Control connection for dinit using namespace dasynq; -using EventLoop_t = event_loop; +using EventLoop_t = event_loop; class ControlConn; class ControlConnWatcher; @@ -47,21 +47,21 @@ extern int active_control_conns; class ServiceSet; class ServiceRecord; -class ControlConnWatcher : public EventLoop_t::bidi_fd_watcher +class ControlConnWatcher : public EventLoop_t::bidi_fd_watcher_impl { inline rearm receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept; - rearm read_ready(EventLoop_t &loop, int fd) noexcept override + public: + rearm read_ready(EventLoop_t &loop, int fd) noexcept { return receiveEvent(loop, fd, IN_EVENTS); } - rearm write_ready(EventLoop_t &loop, int fd) noexcept override + rearm write_ready(EventLoop_t &loop, int fd) noexcept { return receiveEvent(loop, fd, OUT_EVENTS); } - public: EventLoop_t * eventLoop; void set_watches(int flags) diff --git a/src/dasynq/dasynq-binaryheap.h b/src/dasynq/dasynq-binaryheap.h deleted file mode 100644 index aafd53d..0000000 --- a/src/dasynq/dasynq-binaryheap.h +++ /dev/null @@ -1,256 +0,0 @@ -#ifndef DASYNC_BINARYHEAP_H_INCLUDED -#define DASYNC_BINARYHEAP_H_INCLUDED - -#include "dasynq-svec.h" -#include -#include -#include - -namespace dasynq { - -/** - * Priority queue implementation based on a binary heap. - * - * Heap entry "handles" maintain an index into the heap. When the position of a node in the heap - * changes, its handle must be updated. - * - * T : node data type - * P : priority type (eg int) - * Compare : functional object type to compare priorities - */ -template > -class BinaryHeap -{ - public: - struct handle_t; - - private: - - // Actual heap node - class HeapNode - { - public: - P data; - handle_t * hnd_p; - - HeapNode(handle_t * hnd, const P &odata) : data(odata), hnd_p(hnd) - { - // nothing to do - } - }; - - svector hvec; - - using hindex_t = typename decltype(hvec)::size_type; - - int root_node = -1; - hindex_t num_nodes = 0; - - public: - - // Handle to an element on the heap in the node buffer; also contains the data associated - // with the node. (Alternative implementation would be to store the heap data in a - // separate container, and have the handle be an index into that container). - struct handle_t - { - T hd; - hindex_t heap_index; - }; - - // Initialise a handle (if it does not have a suitable constructor). Need not do anything - // but may store a sentinel value to mark the handle as inactive. It should not be - // necessary to call this, really. - static void init_handle(handle_t &h) noexcept - { - } - - private: - - // In hindsight, I probably could have used std::priority_queue rather than re-implementing a - // queue here. However, priority_queue doesn't expose the container (except as a protected - // member) and so would make it more complicated to reserve storage. It would also have been - // necessary to override assignment between elements to correctly update the reference in the - // handle. - - bool bubble_down() - { - return bubble_down(hvec.size() - 1); - } - - // Bubble a newly added timer down to the correct position - bool bubble_down(hindex_t pos) - { - // int pos = v.size() - 1; - Compare lt; - while (pos > 0) { - hindex_t parent = (pos - 1) / 2; - if (! lt(hvec[pos].data, hvec[parent].data)) { - break; - } - - std::swap(hvec[pos], hvec[parent]); - std::swap(hvec[pos].hnd_p->heap_index, hvec[parent].hnd_p->heap_index); - pos = parent; - } - - return pos == 0; - } - - void bubble_up(hindex_t pos = 0) - { - Compare lt; - hindex_t rmax = hvec.size(); - hindex_t max = (rmax - 1) / 2; - - while (pos <= max) { - hindex_t selchild; - hindex_t lchild = pos * 2 + 1; - hindex_t rchild = lchild + 1; - if (rchild >= rmax) { - selchild = lchild; - } - else { - // select the sooner of lchild and rchild - selchild = lt(hvec[lchild].data, hvec[rchild].data) ? lchild : rchild; - } - - if (! lt(hvec[selchild].data, hvec[pos].data)) { - break; - } - - std::swap(hvec[selchild].hnd_p->heap_index, hvec[pos].hnd_p->heap_index); - std::swap(hvec[selchild], hvec[pos]); - pos = selchild; - } - } - - void remove_h(hindex_t hidx) - { - // bvec[hvec[hidx].data_index].heap_index = -1; - hvec[hidx].hnd_p->heap_index = -1; - if (hvec.size() != hidx + 1) { - // replace the first element with the last: - // bvec[hvec.back().data_index].heap_index = hidx; - hvec.back().hnd_p->heap_index = hidx; - hvec[hidx] = hvec.back(); - hvec.pop_back(); - - // Now bubble up: - bubble_up(hidx); - } - else { - hvec.pop_back(); - } - } - - public: - - T & node_data(handle_t & index) noexcept - { - return index.hd; - } - - // Allocate a slot, but do not incorporate into the heap: - // u... : parameters for data constructor T::T(...) - template void allocate(handle_t & hnd, U... u) - { - new (& hnd.hd) T(u...); - hnd.heap_index = -1; - constexpr hindex_t max_allowed = std::numeric_limits::is_signed ? - std::numeric_limits::max() : ((hindex_t) - 2); - - if (num_nodes == max_allowed) { - throw std::bad_alloc(); - } - - num_nodes++; - - if (__builtin_expect(hvec.capacity() < num_nodes, 0)) { - hindex_t half_point = max_allowed / 2; - try { - if (__builtin_expect(num_nodes < half_point, 1)) { - hvec.reserve(num_nodes * 2); - } - else { - hvec.reserve(max_allowed); - } - } - catch (std::bad_alloc &e) { - hvec.reserve(num_nodes); - } - } - } - - // Deallocate a slot - void deallocate(handle_t & index) noexcept - { - num_nodes--; - - // shrink the capacity of hvec if num_nodes is sufficiently less than - // its current capacity: - if (num_nodes < hvec.capacity() / 4) { - hvec.shrink_to(num_nodes * 2); - } - } - - bool insert(handle_t & hnd, P pval = P()) noexcept - { - hnd.heap_index = hvec.size(); - hvec.emplace_back(&hnd, pval); - return bubble_down(); - } - - // Get the root node handle. (Returns a handle_t or reference to handle_t). - handle_t & get_root() - { - return * hvec[0].hnd_p; - } - - P &get_root_priority() - { - return hvec[0].data; - } - - void pull_root() - { - remove_h(0); - } - - void remove(handle_t & hnd) - { - remove_h(hnd.heap_index); - } - - bool empty() - { - return hvec.empty(); - } - - bool is_queued(handle_t hnd) - { - return hnd.heap_index != (hindex_t) -1; - } - - // Set a node priority. Returns true iff the node becomes the root node (and wasn't before). - bool set_priority(handle_t & hnd, const P& p) - { - int heap_index = hnd.heap_index; - - Compare lt; - if (lt(hvec[heap_index].data, p)) { - // Increase key - hvec[heap_index].data = p; - bubble_up(heap_index); - return false; - } - else { - // Decrease key - hvec[heap_index].data = p; - return bubble_down(heap_index); - } - } -}; - -} - -#endif diff --git a/src/dasynq/dasynq-interrupt.h b/src/dasynq/dasynq-interrupt.h index 7a68fa1..5864220 100644 --- a/src/dasynq/dasynq-interrupt.h +++ b/src/dasynq/dasynq-interrupt.h @@ -16,7 +16,7 @@ namespace dasynq { template class interrupt_channel; // In the non-multi-thread case, this doesn't need to be supported: -template class interrupt_channel : public Base +template class interrupt_channel : public Base { public: void interrupt_wait() diff --git a/src/dasynq/dasynq-itimer.h b/src/dasynq/dasynq-itimer.h index 43833a1..f6b48ee 100644 --- a/src/dasynq/dasynq-itimer.h +++ b/src/dasynq/dasynq-itimer.h @@ -5,48 +5,43 @@ #include #include "dasynq-timerbase.h" -#include "dasynq-binaryheap.h" namespace dasynq { // Timer implementation based on the (basically obselete) POSIX itimer interface. -template class ITimerEvents : public Base +template class ITimerEvents : public timer_base { private: int timerfd_fd = -1; - BinaryHeap timer_queue; + timer_queue_t timer_queue; #if defined(__APPLE__) #define itimerspec itimerval #endif - static int divide_timespec(const struct timespec &num, const struct timespec &den) - { - // TODO - return 0; - } - // Set the timerfd timeout to match the first timer in the queue (disable the timerfd // if there are no active timers). void set_timer_from_queue() { struct itimerspec newtime; + struct itimerval newalarm; if (timer_queue.empty()) { - newtime.it_value = {0, 0}; - newtime.it_interval = {0, 0}; + newalarm.it_value = {0, 0}; + newalarm.it_interval = {0, 0}; + setitimer(ITIMER_REAL, &newalarm, nullptr); + return; } - else { + #if defined(__APPLE__) - auto &rp = timer_queue.get_root_priority(); - newtime.it_value.tv_sec = rp.tv_sec; - newtime.it_value.tv_usec = rp.tv_nsec / 1000; + auto &rp = timer_queue.get_root_priority(); + newtime.it_value.tv_sec = rp.tv_sec; + newtime.it_value.tv_usec = rp.tv_nsec / 1000; #else - newtime.it_value = timer_queue.get_root_priority(); - newtime.it_interval = {0, 0}; + newtime.it_value = timer_queue.get_root_priority(); + newtime.it_interval = {0, 0}; #endif - } struct timespec curtime; #if defined(__APPLE__) @@ -57,7 +52,6 @@ template class ITimerEvents : public Base #else clock_gettime(CLOCK_MONOTONIC, &curtime); #endif - struct itimerval newalarm; newalarm.it_interval = {0, 0}; newalarm.it_value.tv_sec = newtime.it_value.tv_sec - curtime.tv_sec; #if defined(__APPLE__) @@ -95,58 +89,11 @@ template class ITimerEvents : public Base // should we use the REALTIME clock instead? I have no idea :/ #endif - // Peek timer queue; calculate difference between current time and timeout - struct timespec * timeout = &timer_queue.get_root_priority(); - while (timeout->tv_sec < curtime.tv_sec || (timeout->tv_sec == curtime.tv_sec && - timeout->tv_nsec <= curtime.tv_nsec)) { - // Increment expiry count - timer_queue.node_data(timer_queue.get_root()).expiry_count++; - // (a periodic timer may have overrun; calculated below). - - auto thandle = timer_queue.get_root(); - TimerData &data = timer_queue.node_data(thandle); - timespec &interval = data.interval_time; - if (interval.tv_sec == 0 && interval.tv_nsec == 0) { - // Non periodic timer - timer_queue.pull_root(); - if (data.enabled) { - int expiry_count = data.expiry_count; - data.expiry_count = 0; - Base::receiveTimerExpiry(thandle, timer_queue.node_data(thandle).userdata, expiry_count); - } - if (timer_queue.empty()) { - break; - } - } - else { - // Periodic timer TODO - // First calculate the overrun in time: - /* - struct timespec diff; - diff.tv_sec = curtime.tv_sec - timeout->tv_sec; - diff.tv_nsec = curtime.tv_nsec - timeout->tv_nsec; - if (diff.tv_nsec < 0) { - diff.tv_nsec += 1000000000; - diff.tv_sec--; - } - */ - // Now we have to divide the time overrun by the period to find the - // interval overrun. This requires a division of a value not representable - // as a long... - // TODO use divide_timespec - // TODO better not to remove from queue maybe, but instead mark as inactive, - // adjust timeout, and bubble into correct position - // call Base::receieveTimerEvent - // TODO - } - - // repeat until all expired timeouts processed - // timeout = &timer_queue[0].timeout; - // (shouldn't be necessary; address hasn't changed...) - } + timer_base::process_timer_queue(timer_queue, curtime); + // arm timerfd with timeout from head of queue set_timer_from_queue(); - loop_mech.rearmSignalWatch_nolock(SIGALRM); + // loop_mech.rearmSignalWatch_nolock(SIGALRM); return false; // don't disable signal watch } else { @@ -241,7 +188,31 @@ template class ITimerEvents : public Base void enableTimer_nolock(timer_handle_t &timer_id, bool enable, clock_type = clock_type::MONOTONIC) noexcept { - timer_queue.node_data(timer_id).enabled = enable; + auto &node_data = timer_queue.node_data(timer_id); + auto expiry_count = node_data.expiry_count; + if (expiry_count != 0) { + node_data.expiry_count = 0; + Base::receiveTimerExpiry(timer_id, node_data.userdata, expiry_count); + } + else { + timer_queue.node_data(timer_id).enabled = enable; + } + } + + void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept + { + stop_timer_nolock(timer_id, clock); + } + + void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept + { + if (timer_queue.is_queued(timer_id)) { + bool was_first = (&timer_queue.get_root()) == &timer_id; + timer_queue.remove(timer_id); + if (was_first) { + set_timer_from_queue(); + } + } } }; diff --git a/src/dasynq/dasynq-kqueue.h b/src/dasynq/dasynq-kqueue.h index e2646a2..6493ba8 100644 --- a/src/dasynq/dasynq-kqueue.h +++ b/src/dasynq/dasynq-kqueue.h @@ -224,10 +224,13 @@ template class KqueueLoop : public Base close(kqfd); } - void setFilterEnabled(short filterType, uintptr_t ident, bool enable) + void setFilterEnabled(short filterType, uintptr_t ident, void *udata, bool enable) { + // Note, on OpenBSD enabling or disabling filter will not alter the filter parameters (udata etc); + // on OS X however, it will. Therefore we set udata here (to the same value as it was originally + // set) in order to work correctly on both kernels. struct kevent kev; - EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, 0, 0, 0); + EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, 0, 0, udata); kevent(kqfd, &kev, 1, nullptr, 0, nullptr); } @@ -290,7 +293,7 @@ template class KqueueLoop : public Base void enableFdWatch(int fd, void *userdata, int flags) { - setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, true); + setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, userdata, true); } void enableFdWatch_nolock(int fd, void *userdata, int flags) @@ -300,7 +303,7 @@ template class KqueueLoop : public Base void disableFdWatch(int fd, int flags) { - setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, false); + setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, nullptr, false); } void disableFdWatch_nolock(int fd, int flags) diff --git a/src/dasynq/dasynq-mutex.h b/src/dasynq/dasynq-mutex.h index 9cf44b3..312c22d 100644 --- a/src/dasynq/dasynq-mutex.h +++ b/src/dasynq/dasynq-mutex.h @@ -38,7 +38,7 @@ class DMutex using DMutex = std::mutex; // A "null" mutex, for which locking / unlocking actually does nothing. -class NullMutex +class null_mutex { DASYNQ_EMPTY_BODY diff --git a/src/dasynq/dasynq-naryheap.h b/src/dasynq/dasynq-naryheap.h new file mode 100644 index 0000000..4156a42 --- /dev/null +++ b/src/dasynq/dasynq-naryheap.h @@ -0,0 +1,364 @@ +#ifndef DASYNC_NARYHEAP_H_INCLUDED +#define DASYNC_NARYHEAP_H_INCLUDED + +#include "dasynq-svec.h" +#include +#include +#include + +namespace dasynq { + +/** + * Priority queue implementation based on an "Nary" heap (for power-of-2 N's). + * + * This is implemented somewhat as a heap-of-(binary)heaps. Eg with an N of 8, each mini heap has 7 elements; + * a mini-heap looks like: + * + * 0 + * 1 2 + * 3 4 5 6 + * + * There are N child mini-heaps underneath each mini-heap. + * + * Heap entry "handles" maintain an index into the heap. When the position of a node in the heap + * changes, its handle must be updated. + * + * T : node data type + * P : priority type (eg int) + * Compare : functional object type to compare priorities + */ +template , int N = 16> +class NaryHeap +{ + public: + struct handle_t; + using handle_t_r = handle_t &; + + private: + + // Actual heap node + class HeapNode + { + public: + P data; + handle_t * hnd_p; + + HeapNode(handle_t * hnd, const P &odata) : data(odata), hnd_p(hnd) + { + // nothing to do + } + + HeapNode() { } + }; + + svector hvec; + + using hindex_t = typename decltype(hvec)::size_type; + + int root_node = -1; + hindex_t num_nodes = 0; + + public: + + // Handle to an element on the heap in the node buffer; also contains the data associated + // with the node. (Alternative implementation would be to store the heap data in a + // separate container, and have the handle be an index into that container). + struct handle_t + { + T hd; + hindex_t heap_index; + }; + + // Initialise a handle (if it does not have a suitable constructor). Need not do anything + // but may store a sentinel value to mark the handle as inactive. It should not be + // necessary to call this, really. + static void init_handle(handle_t &h) noexcept + { + } + + private: + + // In hindsight, I probably could have used std::priority_queue rather than re-implementing a + // queue here. However, priority_queue doesn't expose the container (except as a protected + // member) and so would make it more complicated to reserve storage. It would also have been + // necessary to override assignment between elements to correctly update the reference in the + // handle. + + bool bubble_down() noexcept + { + return bubble_down(hvec.size() - 1); + } + + // Bubble a newly added timer down to the correct position + bool bubble_down(hindex_t pos) noexcept + { + P op = hvec[pos].data; + handle_t * ohndl = hvec[pos].hnd_p; + return bubble_down(pos, ohndl, op); + } + + bool bubble_down(hindex_t pos, handle_t * ohndl, const P &op) noexcept + { + // int pos = v.size() - 1; + Compare lt; + + while (pos > 0) { + // bubble down within micro heap: + auto mh_index = pos % (N - 1); + auto mh_base = pos - mh_index; + while (mh_index > 0) { + hindex_t parent = (mh_index - 1) / 2; + if (! lt(op, hvec[parent + mh_base].data)) { + pos = mh_index + mh_base; + hvec[pos].hnd_p = ohndl; + ohndl->heap_index = pos; + return false; + } + + hvec[mh_index + mh_base] = hvec[parent + mh_base]; + hvec[mh_index + mh_base].hnd_p->heap_index = mh_index + mh_base; + mh_index = parent; + } + + pos = mh_base; + if (pos == 0) break; + + auto containing_unit = pos / (N - 1); + auto parent_unit = (containing_unit - 1) / N; + auto rem = (containing_unit - 1) % N; + auto parent_idx = (rem / 2) + (N / 2) - 1; + + hindex_t parent = parent_unit * (N - 1) + parent_idx; + if (! lt(op, hvec[parent].data)) { + break; + } + + hvec[pos] = hvec[parent]; + hvec[pos].hnd_p->heap_index = pos; + pos = parent; + } + + hvec[pos].hnd_p = ohndl; + ohndl->heap_index = pos; + + return pos == 0; + } + + void bubble_up(hindex_t pos = 0) + { + P p = hvec[pos].data; + handle_t &hndl = *hvec[pos].hnd_p; + bubble_up(pos, hvec.size() - 1, hndl, p); + } + + void bubble_up(hindex_t pos, hindex_t rmax, handle_t &h, const P &p) noexcept + { + Compare lt; + + while (true) { + auto containing_unit = pos / (N - 1); + auto mh_index = pos % (N - 1); + auto mh_base = pos - mh_index; + + while (mh_index < (N-2) / 2) { + hindex_t selchild; + hindex_t lchild = mh_index * 2 + 1 + mh_base; + hindex_t rchild = lchild + 1; + + if (rchild >= rmax) { + if (lchild > rmax) { + hvec[pos].hnd_p = &h; + hvec[pos].data = p; + h.heap_index = pos; + return; + } + selchild = lchild; + } + else { + // select the sooner of lchild and rchild + selchild = lt(hvec[lchild].data, hvec[rchild].data) ? lchild : rchild; + } + + if (! lt(hvec[selchild].data, p)) { + hvec[pos].hnd_p = &h; + hvec[pos].data = p; + h.heap_index = pos; + return; + } + + hvec[pos] = hvec[selchild]; + hvec[pos].hnd_p->heap_index = pos; + pos = selchild; + mh_index = pos - mh_base; + } + + auto left_unit = containing_unit * N + (mh_index - (N/2 - 1)) * 2 + 1; + // auto right_unit = left_unit + 1; + + hindex_t selchild; + hindex_t lchild = left_unit * (N - 1); + hindex_t rchild = lchild + (N - 1); + if (rchild >= rmax) { + if (lchild > rmax) { + break; + } + selchild = lchild; + } + else { + // select the sooner of lchild and rchild + selchild = lt(hvec[lchild].data, hvec[rchild].data) ? lchild : rchild; + } + + if (! lt(hvec[selchild].data, p)) { + break; + } + + hvec[pos] = hvec[selchild]; + hvec[pos].hnd_p->heap_index = pos; + pos = selchild; + } + + hvec[pos].hnd_p = &h; + hvec[pos].data = p; + h.heap_index = pos; + } + + void remove_h(hindex_t hidx) + { + // bvec[hvec[hidx].data_index].heap_index = -1; + hvec[hidx].hnd_p->heap_index = -1; + if (hvec.size() != hidx + 1) { + // replace the first element with the last: + // bvec[hvec.back().data_index].heap_index = hidx; + + //hvec.back().hnd_p->heap_index = hidx; + //hvec[hidx] = hvec.back(); + //hvec.pop_back(); + + // Now bubble up: + //bubble_up(hidx); + + bubble_up(hidx, hvec.size() - 2, *(hvec.back().hnd_p), hvec.back().data); + hvec.pop_back(); + } + else { + hvec.pop_back(); + } + } + + public: + + T & node_data(handle_t & index) noexcept + { + return index.hd; + } + + // Allocate a slot, but do not incorporate into the heap: + // u... : parameters for data constructor T::T(...) + template void allocate(handle_t & hnd, U... u) + { + new (& hnd.hd) T(u...); + hnd.heap_index = -1; + constexpr hindex_t max_allowed = std::numeric_limits::is_signed ? + std::numeric_limits::max() : ((hindex_t) - 2); + + if (num_nodes == max_allowed) { + throw std::bad_alloc(); + } + + num_nodes++; + + if (__builtin_expect(hvec.capacity() < num_nodes, 0)) { + hindex_t half_point = max_allowed / 2; + try { + if (__builtin_expect(num_nodes < half_point, 1)) { + hvec.reserve(num_nodes * 2); + } + else { + hvec.reserve(max_allowed); + } + } + catch (std::bad_alloc &e) { + hvec.reserve(num_nodes); + } + } + } + + // Deallocate a slot + void deallocate(handle_t & index) noexcept + { + num_nodes--; + + // shrink the capacity of hvec if num_nodes is sufficiently less than + // its current capacity: + if (num_nodes < hvec.capacity() / 4) { + hvec.shrink_to(num_nodes * 2); + } + } + + bool insert(handle_t & hnd) noexcept + { + P pval = P(); + return insert(hnd, pval); + } + + bool insert(handle_t & hnd, const P & pval) noexcept + { + hvec.emplace_back(&hnd, pval); + return bubble_down(hvec.size() - 1, &hnd, pval); + } + + // Get the root node handle. (Returns a handle_t or reference to handle_t). + handle_t & get_root() + { + return * hvec[0].hnd_p; + } + + P &get_root_priority() + { + return hvec[0].data; + } + + void pull_root() + { + remove_h(0); + } + + void remove(handle_t & hnd) + { + remove_h(hnd.heap_index); + } + + bool empty() + { + return hvec.empty(); + } + + bool is_queued(handle_t hnd) + { + return hnd.heap_index != (hindex_t) -1; + } + + // Set a node priority. Returns true iff the node becomes the root node (and wasn't before). + bool set_priority(handle_t & hnd, const P& p) + { + int heap_index = hnd.heap_index; + + Compare lt; + if (lt(hvec[heap_index].data, p)) { + // Increase key + hvec[heap_index].data = p; + bubble_up(heap_index); + return false; + } + else { + // Decrease key + hvec[heap_index].data = p; + return bubble_down(heap_index); + } + } +}; + +} + +#endif diff --git a/src/dasynq/dasynq-svec.h b/src/dasynq/dasynq-svec.h index cddec09..9ff09d0 100644 --- a/src/dasynq/dasynq-svec.h +++ b/src/dasynq/dasynq-svec.h @@ -156,6 +156,16 @@ class svector { return array[size_v - 1]; } + + T* begin() const + { + return array; + } + + T* end() const + { + return array + size_v; + } }; diff --git a/src/dasynq/dasynq-timerbase.h b/src/dasynq/dasynq-timerbase.h index 7e7720a..bc1b91c 100644 --- a/src/dasynq/dasynq-timerbase.h +++ b/src/dasynq/dasynq-timerbase.h @@ -1,6 +1,8 @@ #ifndef DASYNQ_TIMERBASE_H_INCLUDED #define DASYNQ_TIMERBASE_H_INCLUDED +#include "dasynq-naryheap.h" + namespace dasynq { class TimerData @@ -34,13 +36,175 @@ class CompareTimespec } }; -using timer_handle_t = BinaryHeap::handle_t; +using timer_queue_t = NaryHeap; +using timer_handle_t = timer_queue_t::handle_t; -static void init_timer_handle(timer_handle_t &hnd) noexcept +static inline void init_timer_handle(timer_handle_t &hnd) noexcept { - BinaryHeap::init_handle(hnd); + timer_queue_t::init_handle(hnd); } +static inline int divide_timespec(const struct timespec &num, const struct timespec &den, struct timespec &rem) +{ + if (num.tv_sec < den.tv_sec) { + rem = num; + return 0; + } + + if (num.tv_sec == den.tv_sec) { + if (num.tv_nsec < den.tv_nsec) { + rem = num; + return 0; + } + if (num.tv_sec == 0) { + rem.tv_sec = 0; + rem.tv_nsec = num.tv_nsec % den.tv_nsec; + return num.tv_nsec / den.tv_nsec; + } + // num.tv_sec == den.tv_sec and both are >= 1. + // The result can only be 1: + rem.tv_sec = 0; + rem.tv_nsec = num.tv_nsec - den.tv_nsec; + return 1; + } + + // At this point, num.tv_sec >= 1. + + auto &r_sec = rem.tv_sec; + auto &r_nsec = rem.tv_nsec; + r_sec = num.tv_sec; + r_nsec = num.tv_nsec; + auto d_sec = den.tv_sec; + auto d_nsec = den.tv_nsec; + + r_sec -= d_sec; + if (r_nsec >= d_nsec) { + r_nsec -= d_nsec; + } + else { + r_nsec += (1000000000ULL - d_nsec); + r_sec -= 1; + } + + // Check now for common case: one timer expiry with no overrun + if (r_sec < d_sec || (r_sec == d_sec && r_nsec < d_nsec)) { + return 1; + } + + int nval = 1; + int rval = 1; // we have subtracted 1*D already + + // shift denominator until it is greater than/equal to numerator: + while (d_sec < r_sec) { + d_sec *= 2; + d_nsec *= 2; + if (d_nsec >= 1000000000) { + d_nsec -= 1000000000; + d_sec++; + } + nval *= 2; + } + + while (nval > 0) { + if (d_sec < r_sec || (d_sec == r_sec && d_nsec <= r_nsec)) { + // subtract: + r_sec -= d_sec; + if (d_nsec > r_nsec) { + r_nsec += 1000000000; + r_sec--; + } + r_nsec -= d_nsec; + + rval += nval; + } + + bool low = d_sec & 1; + d_nsec /= 2; + d_nsec += low ? 500000000ULL : 0; + d_sec /= 2; + nval /= 2; + } + + return rval; +} + +template class timer_base : public Base +{ + protected: + + void process_timer_queue(timer_queue_t &queue, const struct timespec &curtime) + { + // Peek timer queue; calculate difference between current time and timeout + const struct timespec * timeout = &queue.get_root_priority(); + while (timeout->tv_sec < curtime.tv_sec || (timeout->tv_sec == curtime.tv_sec && + timeout->tv_nsec <= curtime.tv_nsec)) { + auto & thandle = queue.get_root(); + TimerData &data = queue.node_data(thandle); + timespec &interval = data.interval_time; + data.expiry_count++; + queue.pull_root(); + if (interval.tv_sec == 0 && interval.tv_nsec == 0) { + // Non periodic timer + if (data.enabled) { + data.enabled = false; + int expiry_count = data.expiry_count; + data.expiry_count = 0; + Base::receiveTimerExpiry(thandle, data.userdata, expiry_count); + } + if (queue.empty()) { + break; + } + } + else { + // First calculate the overrun in time: + struct timespec diff; + diff.tv_sec = curtime.tv_sec - timeout->tv_sec; + if (curtime.tv_nsec > timeout->tv_nsec) { + diff.tv_nsec = curtime.tv_nsec - timeout->tv_nsec; + } + else { + diff.tv_nsec = 1000000000 - timeout->tv_nsec + curtime.tv_nsec; + diff.tv_sec--; + } + + // Now we have to divide the time overrun by the period to find the + // interval overrun. This requires a division of a value not representable + // as a long... + struct timespec rem; + data.expiry_count += divide_timespec(diff, interval, rem); + + // new time is current time + interval - remainder: + struct timespec newtime = curtime; + newtime.tv_sec += interval.tv_sec; + newtime.tv_nsec += interval.tv_nsec; + if (newtime.tv_nsec > 1000000000) { + newtime.tv_nsec -= 1000000000; + newtime.tv_sec++; + } + newtime.tv_sec -= rem.tv_sec; + if (rem.tv_nsec > newtime.tv_nsec) { + newtime.tv_nsec += 1000000000 - rem.tv_nsec; + newtime.tv_sec--; + } + else { + newtime.tv_nsec -= rem.tv_nsec; + } + + queue.insert(thandle, newtime); + if (data.enabled) { + data.enabled = false; + int expiry_count = data.expiry_count; + data.expiry_count = 0; + Base::receiveTimerExpiry(thandle, data.userdata, expiry_count); + } + } + + // repeat until all expired timeouts processed + timeout = &queue.get_root_priority(); + } + } +}; + } #endif /* DASYNQ_TIMERBASE_H_INCLUDED */ diff --git a/src/dasynq/dasynq-timerfd.h b/src/dasynq/dasynq-timerfd.h index f31cdc3..a73e92e 100644 --- a/src/dasynq/dasynq-timerfd.h +++ b/src/dasynq/dasynq-timerfd.h @@ -5,7 +5,6 @@ #include #include "dasynq-timerbase.h" -#include "dasynq-binaryheap.h" namespace dasynq { @@ -18,22 +17,15 @@ namespace dasynq { // we are given a handle; we need to use this to modify the watch. We delegate the // process of allocating a handle to a priority heap implementation (BinaryHeap). -template class TimerFdEvents : public Base +template class TimerFdEvents : public timer_base { private: int timerfd_fd = -1; int systemtime_fd = -1; - using timer_queue_t = BinaryHeap; timer_queue_t timer_queue; timer_queue_t wallclock_queue; - static int divide_timespec(const struct timespec &num, const struct timespec &den) noexcept - { - // TODO - return 0; - } - // Set the timerfd timeout to match the first timer in the queue (disable the timerfd // if there are no active timers). static void set_timer_from_queue(int fd, timer_queue_t &queue) noexcept @@ -64,55 +56,8 @@ template class TimerFdEvents : public Base DASYNQ_UNREACHABLE; } - // Peek timer queue; calculate difference between current time and timeout - struct timespec * timeout = &queue.get_root_priority(); - while (timeout->tv_sec < curtime.tv_sec || (timeout->tv_sec == curtime.tv_sec && - timeout->tv_nsec <= curtime.tv_nsec)) { - // Increment expiry count - queue.node_data(queue.get_root()).expiry_count++; - // (a periodic timer may have overrun; calculated below). + timer_base::process_timer_queue(queue, curtime); - auto thandle = queue.get_root(); - TimerData &data = queue.node_data(thandle); - timespec &interval = data.interval_time; - if (interval.tv_sec == 0 && interval.tv_nsec == 0) { - // Non periodic timer - queue.pull_root(); - if (data.enabled) { - int expiry_count = data.expiry_count; - data.expiry_count = 0; - Base::receiveTimerExpiry(thandle, data.userdata, expiry_count); - } - if (queue.empty()) { - break; - } - } - else { - // Periodic timer TODO - // First calculate the overrun in time: - /* - struct timespec diff; - diff.tv_sec = curtime.tv_sec - timeout->tv_sec; - diff.tv_nsec = curtime.tv_nsec - timeout->tv_nsec; - if (diff.tv_nsec < 0) { - diff.tv_nsec += 1000000000; - diff.tv_sec--; - } - */ - // Now we have to divide the time overrun by the period to find the - // interval overrun. This requires a division of a value not representable - // as a long... - // TODO use divide_timespec - // TODO better not to remove from queue maybe, but instead mark as inactive, - // adjust timeout, and bubble into correct position - // call Base::receieveTimerEvent - // TODO - } - - // repeat until all expired timeouts processed - // timeout = &timer_queue[0].timeout; - // (shouldn't be necessary; address hasn't changed...) - } // arm timerfd with timeout from head of queue set_timer_from_queue(fd, queue); } @@ -140,6 +85,18 @@ template class TimerFdEvents : public Base // TODO locking (here and everywhere) } + timer_queue_t & get_queue(clock_type clock) + { + switch(clock) { + case clock_type::SYSTEM: + return wallclock_queue; + case clock_type::MONOTONIC: + return timer_queue; + default: + DASYNQ_UNREACHABLE; + } + } + public: template void receiveFdEvent(T &loop_mech, typename Base::FD_r fd_r, void * userdata, int flags) @@ -182,16 +139,8 @@ template class TimerFdEvents : public Base // Add timer, store into given handle void addTimer(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC) { - switch(clock) { - case clock_type::SYSTEM: - wallclock_queue.allocate(h, userdata); - break; - case clock_type::MONOTONIC: - timer_queue.allocate(h, userdata); - break; - default: - DASYNQ_UNREACHABLE; - } + timer_queue_t & queue = get_queue(clock); + queue.allocate(h, userdata); } void removeTimer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept @@ -201,24 +150,26 @@ template class TimerFdEvents : public Base void removeTimer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept { - switch(clock) { - case clock_type::SYSTEM: - if (wallclock_queue.is_queued(timer_id)) { - wallclock_queue.remove(timer_id); - } - wallclock_queue.deallocate(timer_id); - break; - case clock_type::MONOTONIC: - if (timer_queue.is_queued(timer_id)) { - timer_queue.remove(timer_id); - } - timer_queue.deallocate(timer_id); - break; - default: - DASYNQ_UNREACHABLE; + timer_queue_t & queue = get_queue(clock); + if (queue.is_queued(timer_id)) { + queue.remove(timer_id); } + queue.deallocate(timer_id); } - + + void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept + { + stop_timer_nolock(timer_id, clock); + } + + void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept + { + timer_queue_t & queue = get_queue(clock); + if (queue.is_queued(timer_id)) { + queue.remove(timer_id); + } + } + // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0. // enable: specifies whether to enable reporting of timeouts/intervals void setTimer(timer_handle_t & timer_id, struct timespec &timeout, struct timespec &interval, @@ -273,15 +224,16 @@ template class TimerFdEvents : public Base void enableTimer_nolock(timer_handle_t & timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept { - switch (clock) { - case clock_type::SYSTEM: - wallclock_queue.node_data(timer_id).enabled = enable; - break; - case clock_type::MONOTONIC: - timer_queue.node_data(timer_id).enabled = enable; - break; - default: - DASYNQ_UNREACHABLE; + timer_queue_t & queue = get_queue(clock); + + auto &node_data = queue.node_data(timer_id); + auto expiry_count = node_data.expiry_count; + if (expiry_count != 0) { + node_data.expiry_count = 0; + Base::receiveTimerExpiry(timer_id, node_data.userdata, expiry_count); + } + else { + queue.node_data(timer_id).enabled = enable; } } diff --git a/src/dasynq/dasynq.h b/src/dasynq/dasynq.h index 0232876..1864273 100644 --- a/src/dasynq/dasynq.h +++ b/src/dasynq/dasynq.h @@ -4,9 +4,38 @@ #include "dasynq-config.h" #include "dasynq-flags.h" -#include "dasynq-binaryheap.h" +#include "dasynq-naryheap.h" #include "dasynq-interrupt.h" +// Dasynq uses a "mix-in" pattern to produce an event loop implementation incorporating selectable implementations of +// various components (main backend, timers, child process watch mechanism etc). In C++ this can be achieved by +// a template for some component which extends its own type parameter: +// +// template class X : public B { .... } +// +// We can chain several such components together (and so so below) to "mix in" the functionality of each into the final +// class, eg: +// +// template using Loop = EpollLoop>>>; +// +// (which defines an alias template "Loop", whose implementation will use the epoll backend, a standard interrupt channel +// implementation, a timerfd-based timer implementation, and the standard child process watch implementation). +// We sometimes need the base class to be able to call derived-class members: to do this we pass a reference to +// the derived instance into a templated method in the base, called "init": +// +// template void init(T *derived) +// { +// // can call method on derived: +// derived->add_listener(); +// // chain to next class: +// Base::init(derived); +// } +// +// At the base all this is the EventDispatch class, defined below, which receives event notifications and inserts +// them into a queue for processing. The event_loop class, also below, wraps this (via composition) in an interface +// which can be used to register/de-regsiter/enable/disable event watchers, and which can process the queued events +// by calling the watcher callbacks. The event_loop class also provides some synchronisation to ensure thread-safety. + #if defined(DASYNQ_CUSTOM_LOOP_IMPLEMENTATION) // Loop and LoopTraits defined already; used for testing #elif defined(DASYNQ_HAVE_KQUEUE) @@ -65,7 +94,7 @@ namespace dprivate { class BaseWatcher; } -using PrioQueue = BinaryHeap; +using PrioQueue = NaryHeap; inline namespace { constexpr int DEFAULT_PRIORITY = 50; @@ -102,6 +131,12 @@ namespace dprivate { template class child_proc_watcher; template class timer; + template class fd_watcher_impl; + template class bidi_fd_watcher_impl; + template class signal_watcher_impl; + template class child_proc_watcher_impl; + template class timer_impl; + enum class WatchType { SIGNAL, @@ -150,6 +185,9 @@ namespace dprivate { BaseWatcher(WatchType wt) noexcept : watchType(wt) { } + virtual void dispatch(void *loop_ptr) noexcept { }; + virtual void dispatch_second(void *loop_ptr) noexcept { } + virtual ~BaseWatcher() noexcept { } // Called when the watcher has been removed. @@ -175,8 +213,8 @@ namespace dprivate { BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { } public: - using SigInfo = typename Traits::SigInfo; - typedef SigInfo &SigInfo_p; + using siginfo_t = typename Traits::SigInfo; + typedef siginfo_t &siginfo_p; }; template @@ -210,7 +248,7 @@ namespace dprivate { // The main instance is the "input" watcher only; we keep a secondary watcher // with a secondary set of flags for the "output" watcher: - BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD); + BaseWatcher outWatcher {WatchType::SECONDARYFD}; int read_removed : 1; // read watch removed? int write_removed : 1; // write watch removed? @@ -239,6 +277,7 @@ namespace dprivate { protected: timer_handle_t timer_handle; int intervals; + clock_type clock; BaseTimerWatcher() : BaseWatcher(WatchType::TIMER) { @@ -253,7 +292,7 @@ namespace dprivate { template class waitqueue; template class waitqueue_node; - // Select an appropriate conditiona variable type for a mutex: + // Select an appropriate condition variable type for a mutex: // condition_variable if mutex is std::mutex, or condition_variable_any // otherwise. template class condvarSelector; @@ -270,13 +309,13 @@ namespace dprivate { typedef std::condition_variable_any condvar; }; - template <> class waitqueue_node + template <> class waitqueue_node { - // Specialised waitqueue_node for NullMutex. - friend class waitqueue; + // Specialised waitqueue_node for null_mutex. + friend class waitqueue; public: - void wait(std::unique_lock &ul) { } + void wait(std::unique_lock &ul) { } void signal() { } DASYNQ_EMPTY_BODY; @@ -300,20 +339,20 @@ namespace dprivate { } }; - template <> class waitqueue + template <> class waitqueue { public: - waitqueue_node * unqueue() + waitqueue_node * unqueue() { return nullptr; } - waitqueue_node * getHead() + waitqueue_node * getHead() { return nullptr; } - bool checkHead(waitqueue_node &node) + bool checkHead(waitqueue_node &node) { return true; } @@ -323,7 +362,7 @@ namespace dprivate { return true; } - void queue(waitqueue_node *node) + void queue(waitqueue_node *node) { } }; @@ -445,7 +484,7 @@ namespace dprivate { if (is_multi_watch) { BaseBidiFdWatcher *bbdw = static_cast(bwatcher); bbdw->watch_flags &= ~flags; - if (flags & IN_EVENTS && flags & OUT_EVENTS) { + if ((flags & IN_EVENTS) && (flags & OUT_EVENTS)) { // Queue the secondary watcher first: queueWatcher(&bbdw->outWatcher); } @@ -491,7 +530,7 @@ namespace dprivate { return nullptr; } - auto rhndl = event_queue.get_root(); + auto & rhndl = event_queue.get_root(); BaseWatcher *r = event_queue.node_data(rhndl); event_queue.pull_root(); return r; @@ -572,6 +611,12 @@ class event_loop friend class dprivate::child_proc_watcher; friend class dprivate::timer; + template friend class dprivate::fd_watcher_impl; + template friend class dprivate::bidi_fd_watcher_impl; + template friend class dprivate::signal_watcher_impl; + template friend class dprivate::child_proc_watcher_impl; + template friend class dprivate::timer_impl; + public: using loop_traits_t = LoopTraits; @@ -840,6 +885,11 @@ class event_loop loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true, clock); } + void stop_timer(BaseTimerWatcher *callback, clock_type clock) + { + loop_mech.stop_timer(callback->timer_handle, clock); + } + void deregister(BaseTimerWatcher *callback, clock_type clock) { loop_mech.removeTimer(callback->timer_handle, clock); @@ -952,7 +1002,7 @@ class event_loop } } else if (rearmType == rearm::DISARM) { - // Nothing more to do + // TODO should actually disarm. } else if (rearmType == rearm::REARM) { bdfw->watch_flags |= IN_EVENTS; @@ -1011,7 +1061,7 @@ class event_loop } } else if (rearmType == rearm::DISARM) { - // Nothing more to do + // TODO actually disarm. } else if (rearmType == rearm::REARM) { bdfw->watch_flags |= OUT_EVENTS; @@ -1035,11 +1085,12 @@ class event_loop { // Called with lock held if (rearmType == rearm::REARM) { - loop_mech.enableTimer_nolock(btw->timer_handle, true); + loop_mech.enableTimer_nolock(btw->timer_handle, true, btw->clock); } else if (rearmType == rearm::REMOVE) { - loop_mech.removeTimer_nolock(btw->timer_handle); + loop_mech.removeTimer_nolock(btw->timer_handle, btw->clock); } + // TODO DISARM? } bool processEvents() noexcept @@ -1048,7 +1099,7 @@ class event_loop ed.lock.lock(); // So this pulls *all* currently pending events and processes them in the current thread. - // That's probably good for throughput, but maybe the behavior should be configurable. + // That's probably good for throughput, but maybe the behaviour should be configurable. BaseWatcher * pqueue = ed.pullEvent(); bool active = false; @@ -1058,115 +1109,23 @@ class event_loop pqueue->active = true; active = true; - rearm rearmType = rearm::NOOP; - bool is_multi_watch = false; BaseBidiFdWatcher *bbfw = nullptr; // (Above variables are initialised only to silence compiler warnings). - // Read/manipulate watch_flags (if necessary) *before* we release the lock: - if (pqueue->watchType == WatchType::FD) { - BaseFdWatcher *bfw = static_cast(pqueue); - bbfw = static_cast(bfw); - is_multi_watch = bfw->watch_flags & dprivate::multi_watch; - if (! LoopTraits::has_separate_rw_fd_watches && is_multi_watch) { - // Clear the input watch flags to avoid enabling read watcher while active: - bfw->watch_flags &= ~IN_EVENTS; - } - } - else if (pqueue->watchType == WatchType::SECONDARYFD) { - is_multi_watch = true; + if (pqueue->watchType == WatchType::SECONDARYFD) { // construct a pointer to the main watcher: char * rp = (char *)pqueue; rp -= offsetof(BaseBidiFdWatcher, outWatcher); bbfw = (BaseBidiFdWatcher *)rp; - if (! LoopTraits::has_separate_rw_fd_watches) { - bbfw->watch_flags &= ~OUT_EVENTS; - } - } - - ed.lock.unlock(); - - // Note that we select actions based on the type of the watch, as determined by the watchType - // member. In some ways this screams out for polmorphism; a virtual function could be overridden - // by each of the watcher types. I've instead used switch/case because I think it will perform - // slightly better without the overhead of a virtual function dispatch, but it's got to be a - // close call; I might be guilty of premature optimisation here. - - switch (pqueue->watchType) { - case WatchType::SIGNAL: { - BaseSignalWatcher *bsw = static_cast(pqueue); - rearmType = ((signal_watcher *)bsw)->received(*this, bsw->siginfo.get_signo(), bsw->siginfo); - break; - } - case WatchType::FD: { - BaseFdWatcher *bfw = static_cast(pqueue); - if (is_multi_watch) { - // The primary watcher for a multi-watch watcher is queued for - // read events. - rearmType = ((bidi_fd_watcher *)bbfw)->read_ready(*this, bfw->watch_fd); - bbfw->event_flags &= ~IN_EVENTS; - } - else { - rearmType = ((fd_watcher *)bfw)->fd_event(*this, bfw->watch_fd, bfw->event_flags); - bfw->event_flags = 0; - } - break; - } - case WatchType::CHILD: { - BaseChildWatcher *bcw = static_cast(pqueue); - rearmType = ((child_proc_watcher *)bcw)->child_status(*this, bcw->watch_pid, bcw->child_status); - break; - } - case WatchType::SECONDARYFD: { - rearmType = ((bidi_fd_watcher *)bbfw)->write_ready(*this, bbfw->watch_fd); - bbfw->event_flags &= ~OUT_EVENTS; - break; - } - case WatchType::TIMER: { - BaseTimerWatcher *btw = static_cast(pqueue); - rearmType = ((timer *)btw)->timer_expiry(*this, btw->intervals); - break; - } - default: ; - } - ed.lock.lock(); - - // (if REMOVED, we must not touch pqueue at all) - if (rearmType != rearm::REMOVED) { - - pqueue->active = false; - if (pqueue->deleteme) { - // We don't want a watch that is marked "deleteme" to re-arm itself. - rearmType = rearm::REMOVE; - } - switch (pqueue->watchType) { - case WatchType::SIGNAL: - processSignalRearm(static_cast(pqueue), rearmType); - break; - case WatchType::FD: - rearmType = processFdRearm(static_cast(pqueue), rearmType, is_multi_watch); - break; - case WatchType::SECONDARYFD: - rearmType = processSecondaryRearm(bbfw, rearmType); - break; - case WatchType::TIMER: - processTimerRearm(static_cast(pqueue), rearmType); - break; - default: ; - } - - if (rearmType == rearm::REMOVE) { - ed.lock.unlock(); - // Note that for BidiFd watches, watch_removed is only called on the primary watch. - // The process function called above only returns Rearm::REMOVE if both primary and - // secondary watches have been removed. - (is_multi_watch ? bbfw : pqueue)->watch_removed(); - ed.lock.lock(); - } + // issue a secondary dispatch: + bbfw->dispatch_second(this); + pqueue = ed.pullEvent(); + continue; } - + + pqueue->dispatch(this); pqueue = ed.pullEvent(); } @@ -1184,6 +1143,12 @@ class event_loop using child_proc_watcher = dprivate::child_proc_watcher; using timer = dprivate::timer; + template using fd_watcher_impl = dprivate::fd_watcher_impl; + template using bidi_fd_watcher_impl = dprivate::bidi_fd_watcher_impl; + template using signal_watcher_impl = dprivate::signal_watcher_impl; + template using child_proc_watcher_impl = dprivate::child_proc_watcher_impl; + template using timer_impl = dprivate::timer_impl; + void run() noexcept { while (! processEvents()) { @@ -1202,7 +1167,7 @@ class event_loop } }; -typedef event_loop NEventLoop; +typedef event_loop NEventLoop; typedef event_loop TEventLoop; // from dasync.cc: @@ -1214,11 +1179,13 @@ namespace dprivate { template class signal_watcher : private dprivate::BaseSignalWatcher { + template friend class signal_watcher_impl; + using BaseWatcher = dprivate::BaseWatcher; using T_Mutex = typename EventLoop::mutex_t; -public: - using SigInfo_p = typename dprivate::BaseSignalWatcher::SigInfo_p; + public: + using siginfo_p = typename dprivate::BaseSignalWatcher::siginfo_p; // Register this watcher to watch the specified signal. // If an attempt is made to register with more than one event loop at @@ -1240,7 +1207,7 @@ public: template static signal_watcher *add_watch(EventLoop &eloop, int signo, T watchHndlr) { - class LambdaSigWatcher : public signal_watcher + class LambdaSigWatcher : public signal_watcher_impl { private: T watchHndlr; @@ -1251,7 +1218,7 @@ public: // } - rearm received(EventLoop &eloop, int signo, SigInfo_p siginfo) override + rearm received(EventLoop &eloop, int signo, siginfo_p siginfo) { return watchHndlr(eloop, signo, siginfo); } @@ -1267,13 +1234,46 @@ public: return lsw; } - virtual rearm received(EventLoop &eloop, int signo, SigInfo_p siginfo) = 0; + // virtual rearm received(EventLoop &eloop, int signo, siginfo_p siginfo) = 0; +}; + +template +class signal_watcher_impl : public signal_watcher +{ + void dispatch(void *loop_ptr) noexcept override + { + EventLoop &loop = *static_cast(loop_ptr); + loop.getBaseLock().unlock(); + + auto rearmType = static_cast(this)->received(loop, this->siginfo.get_signo(), this->siginfo); + + loop.getBaseLock().lock(); + + if (rearmType != rearm::REMOVED) { + + this->active = false; + if (this->deleteme) { + // We don't want a watch that is marked "deleteme" to re-arm itself. + rearmType = rearm::REMOVE; + } + + loop.processSignalRearm(this, rearmType); + + if (rearmType == rearm::REMOVE) { + loop.getBaseLock().unlock(); + this->watch_removed(); + loop.getBaseLock().lock(); + } + } + } }; // Posix file descriptor event watcher template class fd_watcher : private dprivate::BaseFdWatcher { + template friend class fd_watcher_impl; + using BaseWatcher = dprivate::BaseWatcher; using T_Mutex = typename EventLoop::mutex_t; @@ -1344,22 +1344,22 @@ class fd_watcher : private dprivate::BaseFdWatcher template static fd_watcher *add_watch(EventLoop &eloop, int fd, int flags, T watchHndlr) { - class LambdaFdWatcher : public fd_watcher + class LambdaFdWatcher : public fd_watcher_impl { private: T watchHndlr; - + public: LambdaFdWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a) { // } - - rearm fd_event(EventLoop &eloop, int fd, int flags) override + + rearm fd_event(EventLoop &eloop, int fd, int flags) { return watchHndlr(eloop, fd, flags); } - + void watch_removed() noexcept override { delete this; @@ -1371,15 +1371,49 @@ class fd_watcher : private dprivate::BaseFdWatcher return lfd; } - virtual rearm fd_event(EventLoop &eloop, int fd, int flags) = 0; + // virtual rearm fd_event(EventLoop &eloop, int fd, int flags) = 0; +}; + +template +class fd_watcher_impl : public fd_watcher +{ + void dispatch(void *loop_ptr) noexcept override + { + EventLoop &loop = *static_cast(loop_ptr); + loop.getBaseLock().unlock(); + + auto rearmType = static_cast(this)->fd_event(loop, this->watch_fd, this->event_flags); + + loop.getBaseLock().lock(); + + if (rearmType != rearm::REMOVED) { + this->event_flags = 0; + this->active = false; + if (this->deleteme) { + // We don't want a watch that is marked "deleteme" to re-arm itself. + rearmType = rearm::REMOVE; + } + + rearmType = loop.processFdRearm(this, rearmType, false); + + if (rearmType == rearm::REMOVE) { + loop.getBaseLock().unlock(); + this->watch_removed(); + loop.getBaseLock().lock(); + } + } + } }; + // A Bi-directional file descriptor watcher with independent read- and write- channels. // This watcher type has two event notification methods which can both potentially be // active at the same time. template class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher { + template friend class bidi_fd_watcher_impl; + using BaseWatcher = dprivate::BaseWatcher; using T_Mutex = typename EventLoop::mutex_t; @@ -1491,14 +1525,74 @@ class bidi_fd_watcher : private dprivate::BaseBidiFdWatcherwatch_fd); } - virtual rearm read_ready(EventLoop &eloop, int fd) noexcept = 0; - virtual rearm write_ready(EventLoop &eloop, int fd) noexcept = 0; + // virtual rearm read_ready(EventLoop &eloop, int fd) noexcept = 0; + // virtual rearm write_ready(EventLoop &eloop, int fd) noexcept = 0; +}; + +template +class bidi_fd_watcher_impl : public bidi_fd_watcher +{ + void dispatch(void *loop_ptr) noexcept override + { + EventLoop &loop = *static_cast(loop_ptr); + loop.getBaseLock().unlock(); + + auto rearmType = static_cast(this)->read_ready(loop, this->watch_fd); + + loop.getBaseLock().lock(); + + if (rearmType != rearm::REMOVED) { + this->event_flags &= ~IN_EVENTS; + this->active = false; + if (this->deleteme) { + // We don't want a watch that is marked "deleteme" to re-arm itself. + rearmType = rearm::REMOVE; + } + + rearmType = loop.processFdRearm(this, rearmType, true); + + if (rearmType == rearm::REMOVE) { + loop.getBaseLock().unlock(); + this->watch_removed(); + loop.getBaseLock().lock(); + } + } + } + + void dispatch_second(void *loop_ptr) noexcept override + { + EventLoop &loop = *static_cast(loop_ptr); + loop.getBaseLock().unlock(); + + auto rearmType = static_cast(this)->write_ready(loop, this->watch_fd); + + loop.getBaseLock().lock(); + + if (rearmType != rearm::REMOVED) { + this->event_flags &= ~OUT_EVENTS; + this->active = false; + if (this->deleteme) { + // We don't want a watch that is marked "deleteme" to re-arm itself. + rearmType = rearm::REMOVE; + } + + rearmType = loop.processSecondaryRearm(this, rearmType); + + if (rearmType == rearm::REMOVE) { + loop.getBaseLock().unlock(); + this->watch_removed(); + loop.getBaseLock().lock(); + } + } + } }; // Child process event watcher template class child_proc_watcher : private dprivate::BaseChildWatcher { + template friend class child_proc_watcher_impl; + using BaseWatcher = dprivate::BaseWatcher; using T_Mutex = typename EventLoop::mutex_t; @@ -1630,14 +1724,45 @@ class child_proc_watcher : private dprivate::BaseChildWatcher +class child_proc_watcher_impl : public child_proc_watcher +{ + void dispatch(void *loop_ptr) noexcept override + { + EventLoop &loop = *static_cast(loop_ptr); + loop.getBaseLock().unlock(); + + auto rearmType = static_cast(this)->child_status(loop, this->watch_pid, this->child_status); + + loop.getBaseLock().lock(); + + if (rearmType != rearm::REMOVED) { + + this->active = false; + if (this->deleteme) { + // We don't want a watch that is marked "deleteme" to re-arm itself. + rearmType = rearm::REMOVE; + } + + // rearmType = loop.process??; + + if (rearmType == rearm::REMOVE) { + loop.getBaseLock().unlock(); + this->watch_removed(); + loop.getBaseLock().lock(); + } + } + } }; template class timer : private BaseTimerWatcher { - private: - clock_type clock; + template friend class timer_impl; + using base_t = BaseTimerWatcher; public: @@ -1650,34 +1775,70 @@ class timer : private BaseTimerWatcher void arm_timer(EventLoop &eloop, struct timespec &timeout) noexcept { - eloop.setTimer(this, timeout, clock); + eloop.setTimer(this, timeout, base_t::clock); } void arm_timer(EventLoop &eloop, struct timespec &timeout, struct timespec &interval) noexcept { - eloop.setTimer(this, timeout, interval, clock); + eloop.setTimer(this, timeout, interval, base_t::clock); } // Arm timer, relative to now: void arm_timer_rel(EventLoop &eloop, struct timespec &timeout) noexcept { - eloop.setTimerRel(this, timeout, clock); + eloop.setTimerRel(this, timeout, base_t::clock); } void arm_timer_rel(EventLoop &eloop, struct timespec &timeout, struct timespec &interval) noexcept { - eloop.setTimerRel(this, timeout, interval, clock); + eloop.setTimerRel(this, timeout, interval, base_t::clock); } + void stop_timer(EventLoop &eloop) noexcept + { + eloop.stop_timer(this, base_t::clock); + } + void deregister(EventLoop &eloop) noexcept { - eloop.deregister(this, clock); + eloop.deregister(this, this->clock); } // Timer expired, and the given number of intervals have elapsed before - // expiry evenet was queued. Normally intervals == 1 to indicate no + // expiry event was queued. Normally intervals == 1 to indicate no // overrun. - virtual rearm timer_expiry(EventLoop &eloop, int intervals) = 0; + // virtual rearm timer_expiry(EventLoop &eloop, int intervals) = 0; +}; + +template +class timer_impl : public timer +{ + void dispatch(void *loop_ptr) noexcept override + { + EventLoop &loop = *static_cast(loop_ptr); + loop.getBaseLock().unlock(); + + auto rearmType = static_cast(this)->timer_expiry(loop, this->intervals); + + loop.getBaseLock().lock(); + + if (rearmType != rearm::REMOVED) { + + this->active = false; + if (this->deleteme) { + // We don't want a watch that is marked "deleteme" to re-arm itself. + rearmType = rearm::REMOVE; + } + + loop.processTimerRearm(this, rearmType); + + if (rearmType == rearm::REMOVE) { + loop.getBaseLock().unlock(); + this->watch_removed(); + loop.getBaseLock().lock(); + } + } + } }; } // namespace dasynq::dprivate diff --git a/src/dinit-log.cc b/src/dinit-log.cc index f96e3aa..e402f6b 100644 --- a/src/dinit-log.cc +++ b/src/dinit-log.cc @@ -20,7 +20,7 @@ LogLevel log_level[2] = { LogLevel::WARN, LogLevel::WARN }; static ServiceSet *service_set = nullptr; // Reference to service set namespace { -class BufferedLogStream : public EventLoop_t::fd_watcher +class BufferedLogStream : public EventLoop_t::fd_watcher_impl { private: @@ -50,7 +50,7 @@ class BufferedLogStream : public EventLoop_t::fd_watcher release = false; } - rearm fd_event(EventLoop_t &loop, int fd, int flags) noexcept override; + rearm fd_event(EventLoop_t &loop, int fd, int flags) noexcept; // Check whether the console can be released. void flushForRelease(); diff --git a/src/dinit.cc b/src/dinit.cc index b7a19e3..53289a0 100644 --- a/src/dinit.cc +++ b/src/dinit.cc @@ -37,7 +37,7 @@ using namespace dasynq; -using EventLoop_t = event_loop; +using EventLoop_t = event_loop; EventLoop_t eventLoop = EventLoop_t(); @@ -52,9 +52,10 @@ void open_control_socket(bool report_ro_failure = true) noexcept; void setup_external_log() noexcept; -class ControlSocketWatcher : public EventLoop_t::fd_watcher +class ControlSocketWatcher : public EventLoop_t::fd_watcher_impl { - rearm fd_event(EventLoop_t &loop, int fd, int flags) override + public: + rearm fd_event(EventLoop_t &loop, int fd, int flags) noexcept { control_socket_cb(&loop, fd); return rearm::REARM; @@ -102,7 +103,7 @@ const char * get_user_home() namespace { - class CallbackSignalHandler : public EventLoop_t::signal_watcher + class CallbackSignalHandler : public EventLoop_t::signal_watcher_impl { public: typedef void (*cb_func_t)(EventLoop_t *); @@ -119,16 +120,16 @@ namespace { this->cb_func = cb_func; } - rearm received(EventLoop_t &eloop, int signo, SigInfo_p siginfo) override + rearm received(EventLoop_t &eloop, int signo, siginfo_p siginfo) { service_set->stop_all_services(ShutdownType::REBOOT); return rearm::REARM; } }; - class ControlSocketWatcher : public EventLoop_t::fd_watcher + class ControlSocketWatcher : public EventLoop_t::fd_watcher_impl { - rearm fd_event(EventLoop_t &loop, int fd, int flags) override + rearm fd_event(EventLoop_t &loop, int fd, int flags) { control_socket_cb(&loop, fd); return rearm::REARM; @@ -288,7 +289,7 @@ int main(int argc, char **argv) sigquit_watcher.setCbFunc(sigterm_cb); } - auto sigterm_watcher = CallbackSignalHandler(sigterm_cb); + CallbackSignalHandler sigterm_watcher {sigterm_cb}; sigint_watcher.add_watch(eventLoop, SIGINT); sigquit_watcher.add_watch(eventLoop, SIGQUIT); diff --git a/src/service.h b/src/service.h index 24f1e36..91c0bb8 100644 --- a/src/service.h +++ b/src/service.h @@ -184,16 +184,16 @@ static std::vector separate_args(std::string &s, std::list { public: ServiceRecord * service; - rearm child_status(EventLoop_t &eloop, pid_t child, int status) noexcept override; + rearm child_status(EventLoop_t &eloop, pid_t child, int status) noexcept; ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { } }; -class ServiceIoWatcher : public EventLoop_t::fd_watcher +class ServiceIoWatcher : public EventLoop_t::fd_watcher_impl { public: ServiceRecord * service;