Hopefully for the last time, as the Dasynq API is now stable.
// Control connection for dinit
using namespace dasynq;
-using EventLoop_t = event_loop<NullMutex>;
+using EventLoop_t = event_loop<null_mutex>;
class ControlConn;
class ControlConnWatcher;
class ServiceSet;
class ServiceRecord;
-class ControlConnWatcher : public EventLoop_t::bidi_fd_watcher
+class ControlConnWatcher : public EventLoop_t::bidi_fd_watcher_impl<ControlConnWatcher>
{
inline rearm receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept;
- rearm read_ready(EventLoop_t &loop, int fd) noexcept override
+ public:
+ rearm read_ready(EventLoop_t &loop, int fd) noexcept
{
return receiveEvent(loop, fd, IN_EVENTS);
}
- rearm write_ready(EventLoop_t &loop, int fd) noexcept override
+ rearm write_ready(EventLoop_t &loop, int fd) noexcept
{
return receiveEvent(loop, fd, OUT_EVENTS);
}
- public:
EventLoop_t * eventLoop;
void set_watches(int flags)
+++ /dev/null
-#ifndef DASYNC_BINARYHEAP_H_INCLUDED
-#define DASYNC_BINARYHEAP_H_INCLUDED
-
-#include "dasynq-svec.h"
-#include <type_traits>
-#include <functional>
-#include <limits>
-
-namespace dasynq {
-
-/**
- * Priority queue implementation based on a binary heap.
- *
- * Heap entry "handles" maintain an index into the heap. When the position of a node in the heap
- * changes, its handle must be updated.
- *
- * T : node data type
- * P : priority type (eg int)
- * Compare : functional object type to compare priorities
- */
-template <typename T, typename P, typename Compare = std::less<P>>
-class BinaryHeap
-{
- public:
- struct handle_t;
-
- private:
-
- // Actual heap node
- class HeapNode
- {
- public:
- P data;
- handle_t * hnd_p;
-
- HeapNode(handle_t * hnd, const P &odata) : data(odata), hnd_p(hnd)
- {
- // nothing to do
- }
- };
-
- svector<HeapNode> hvec;
-
- using hindex_t = typename decltype(hvec)::size_type;
-
- int root_node = -1;
- hindex_t num_nodes = 0;
-
- public:
-
- // Handle to an element on the heap in the node buffer; also contains the data associated
- // with the node. (Alternative implementation would be to store the heap data in a
- // separate container, and have the handle be an index into that container).
- struct handle_t
- {
- T hd;
- hindex_t heap_index;
- };
-
- // Initialise a handle (if it does not have a suitable constructor). Need not do anything
- // but may store a sentinel value to mark the handle as inactive. It should not be
- // necessary to call this, really.
- static void init_handle(handle_t &h) noexcept
- {
- }
-
- private:
-
- // In hindsight, I probably could have used std::priority_queue rather than re-implementing a
- // queue here. However, priority_queue doesn't expose the container (except as a protected
- // member) and so would make it more complicated to reserve storage. It would also have been
- // necessary to override assignment between elements to correctly update the reference in the
- // handle.
-
- bool bubble_down()
- {
- return bubble_down(hvec.size() - 1);
- }
-
- // Bubble a newly added timer down to the correct position
- bool bubble_down(hindex_t pos)
- {
- // int pos = v.size() - 1;
- Compare lt;
- while (pos > 0) {
- hindex_t parent = (pos - 1) / 2;
- if (! lt(hvec[pos].data, hvec[parent].data)) {
- break;
- }
-
- std::swap(hvec[pos], hvec[parent]);
- std::swap(hvec[pos].hnd_p->heap_index, hvec[parent].hnd_p->heap_index);
- pos = parent;
- }
-
- return pos == 0;
- }
-
- void bubble_up(hindex_t pos = 0)
- {
- Compare lt;
- hindex_t rmax = hvec.size();
- hindex_t max = (rmax - 1) / 2;
-
- while (pos <= max) {
- hindex_t selchild;
- hindex_t lchild = pos * 2 + 1;
- hindex_t rchild = lchild + 1;
- if (rchild >= rmax) {
- selchild = lchild;
- }
- else {
- // select the sooner of lchild and rchild
- selchild = lt(hvec[lchild].data, hvec[rchild].data) ? lchild : rchild;
- }
-
- if (! lt(hvec[selchild].data, hvec[pos].data)) {
- break;
- }
-
- std::swap(hvec[selchild].hnd_p->heap_index, hvec[pos].hnd_p->heap_index);
- std::swap(hvec[selchild], hvec[pos]);
- pos = selchild;
- }
- }
-
- void remove_h(hindex_t hidx)
- {
- // bvec[hvec[hidx].data_index].heap_index = -1;
- hvec[hidx].hnd_p->heap_index = -1;
- if (hvec.size() != hidx + 1) {
- // replace the first element with the last:
- // bvec[hvec.back().data_index].heap_index = hidx;
- hvec.back().hnd_p->heap_index = hidx;
- hvec[hidx] = hvec.back();
- hvec.pop_back();
-
- // Now bubble up:
- bubble_up(hidx);
- }
- else {
- hvec.pop_back();
- }
- }
-
- public:
-
- T & node_data(handle_t & index) noexcept
- {
- return index.hd;
- }
-
- // Allocate a slot, but do not incorporate into the heap:
- // u... : parameters for data constructor T::T(...)
- template <typename ...U> void allocate(handle_t & hnd, U... u)
- {
- new (& hnd.hd) T(u...);
- hnd.heap_index = -1;
- constexpr hindex_t max_allowed = std::numeric_limits<hindex_t>::is_signed ?
- std::numeric_limits<hindex_t>::max() : ((hindex_t) - 2);
-
- if (num_nodes == max_allowed) {
- throw std::bad_alloc();
- }
-
- num_nodes++;
-
- if (__builtin_expect(hvec.capacity() < num_nodes, 0)) {
- hindex_t half_point = max_allowed / 2;
- try {
- if (__builtin_expect(num_nodes < half_point, 1)) {
- hvec.reserve(num_nodes * 2);
- }
- else {
- hvec.reserve(max_allowed);
- }
- }
- catch (std::bad_alloc &e) {
- hvec.reserve(num_nodes);
- }
- }
- }
-
- // Deallocate a slot
- void deallocate(handle_t & index) noexcept
- {
- num_nodes--;
-
- // shrink the capacity of hvec if num_nodes is sufficiently less than
- // its current capacity:
- if (num_nodes < hvec.capacity() / 4) {
- hvec.shrink_to(num_nodes * 2);
- }
- }
-
- bool insert(handle_t & hnd, P pval = P()) noexcept
- {
- hnd.heap_index = hvec.size();
- hvec.emplace_back(&hnd, pval);
- return bubble_down();
- }
-
- // Get the root node handle. (Returns a handle_t or reference to handle_t).
- handle_t & get_root()
- {
- return * hvec[0].hnd_p;
- }
-
- P &get_root_priority()
- {
- return hvec[0].data;
- }
-
- void pull_root()
- {
- remove_h(0);
- }
-
- void remove(handle_t & hnd)
- {
- remove_h(hnd.heap_index);
- }
-
- bool empty()
- {
- return hvec.empty();
- }
-
- bool is_queued(handle_t hnd)
- {
- return hnd.heap_index != (hindex_t) -1;
- }
-
- // Set a node priority. Returns true iff the node becomes the root node (and wasn't before).
- bool set_priority(handle_t & hnd, const P& p)
- {
- int heap_index = hnd.heap_index;
-
- Compare lt;
- if (lt(hvec[heap_index].data, p)) {
- // Increase key
- hvec[heap_index].data = p;
- bubble_up(heap_index);
- return false;
- }
- else {
- // Decrease key
- hvec[heap_index].data = p;
- return bubble_down(heap_index);
- }
- }
-};
-
-}
-
-#endif
template <typename Base, typename Mutex = typename Base::mutex_t> class interrupt_channel;
// In the non-multi-thread case, this doesn't need to be supported:
-template <typename Base> class interrupt_channel<Base, NullMutex> : public Base
+template <typename Base> class interrupt_channel<Base, null_mutex> : public Base
{
public:
void interrupt_wait()
#include <time.h>
#include "dasynq-timerbase.h"
-#include "dasynq-binaryheap.h"
namespace dasynq {
// Timer implementation based on the (basically obselete) POSIX itimer interface.
-template <class Base> class ITimerEvents : public Base
+template <class Base> class ITimerEvents : public timer_base<Base>
{
private:
int timerfd_fd = -1;
- BinaryHeap<TimerData, struct timespec, CompareTimespec> timer_queue;
+ timer_queue_t timer_queue;
#if defined(__APPLE__)
#define itimerspec itimerval
#endif
- static int divide_timespec(const struct timespec &num, const struct timespec &den)
- {
- // TODO
- return 0;
- }
-
// Set the timerfd timeout to match the first timer in the queue (disable the timerfd
// if there are no active timers).
void set_timer_from_queue()
{
struct itimerspec newtime;
+ struct itimerval newalarm;
if (timer_queue.empty()) {
- newtime.it_value = {0, 0};
- newtime.it_interval = {0, 0};
+ newalarm.it_value = {0, 0};
+ newalarm.it_interval = {0, 0};
+ setitimer(ITIMER_REAL, &newalarm, nullptr);
+ return;
}
- else {
+
#if defined(__APPLE__)
- auto &rp = timer_queue.get_root_priority();
- newtime.it_value.tv_sec = rp.tv_sec;
- newtime.it_value.tv_usec = rp.tv_nsec / 1000;
+ auto &rp = timer_queue.get_root_priority();
+ newtime.it_value.tv_sec = rp.tv_sec;
+ newtime.it_value.tv_usec = rp.tv_nsec / 1000;
#else
- newtime.it_value = timer_queue.get_root_priority();
- newtime.it_interval = {0, 0};
+ newtime.it_value = timer_queue.get_root_priority();
+ newtime.it_interval = {0, 0};
#endif
- }
struct timespec curtime;
#if defined(__APPLE__)
#else
clock_gettime(CLOCK_MONOTONIC, &curtime);
#endif
- struct itimerval newalarm;
newalarm.it_interval = {0, 0};
newalarm.it_value.tv_sec = newtime.it_value.tv_sec - curtime.tv_sec;
#if defined(__APPLE__)
// should we use the REALTIME clock instead? I have no idea :/
#endif
- // Peek timer queue; calculate difference between current time and timeout
- struct timespec * timeout = &timer_queue.get_root_priority();
- while (timeout->tv_sec < curtime.tv_sec || (timeout->tv_sec == curtime.tv_sec &&
- timeout->tv_nsec <= curtime.tv_nsec)) {
- // Increment expiry count
- timer_queue.node_data(timer_queue.get_root()).expiry_count++;
- // (a periodic timer may have overrun; calculated below).
-
- auto thandle = timer_queue.get_root();
- TimerData &data = timer_queue.node_data(thandle);
- timespec &interval = data.interval_time;
- if (interval.tv_sec == 0 && interval.tv_nsec == 0) {
- // Non periodic timer
- timer_queue.pull_root();
- if (data.enabled) {
- int expiry_count = data.expiry_count;
- data.expiry_count = 0;
- Base::receiveTimerExpiry(thandle, timer_queue.node_data(thandle).userdata, expiry_count);
- }
- if (timer_queue.empty()) {
- break;
- }
- }
- else {
- // Periodic timer TODO
- // First calculate the overrun in time:
- /*
- struct timespec diff;
- diff.tv_sec = curtime.tv_sec - timeout->tv_sec;
- diff.tv_nsec = curtime.tv_nsec - timeout->tv_nsec;
- if (diff.tv_nsec < 0) {
- diff.tv_nsec += 1000000000;
- diff.tv_sec--;
- }
- */
- // Now we have to divide the time overrun by the period to find the
- // interval overrun. This requires a division of a value not representable
- // as a long...
- // TODO use divide_timespec
- // TODO better not to remove from queue maybe, but instead mark as inactive,
- // adjust timeout, and bubble into correct position
- // call Base::receieveTimerEvent
- // TODO
- }
-
- // repeat until all expired timeouts processed
- // timeout = &timer_queue[0].timeout;
- // (shouldn't be necessary; address hasn't changed...)
- }
+ timer_base<Base>::process_timer_queue(timer_queue, curtime);
+
// arm timerfd with timeout from head of queue
set_timer_from_queue();
- loop_mech.rearmSignalWatch_nolock(SIGALRM);
+ // loop_mech.rearmSignalWatch_nolock(SIGALRM);
return false; // don't disable signal watch
}
else {
void enableTimer_nolock(timer_handle_t &timer_id, bool enable, clock_type = clock_type::MONOTONIC) noexcept
{
- timer_queue.node_data(timer_id).enabled = enable;
+ auto &node_data = timer_queue.node_data(timer_id);
+ auto expiry_count = node_data.expiry_count;
+ if (expiry_count != 0) {
+ node_data.expiry_count = 0;
+ Base::receiveTimerExpiry(timer_id, node_data.userdata, expiry_count);
+ }
+ else {
+ timer_queue.node_data(timer_id).enabled = enable;
+ }
+ }
+
+ void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ stop_timer_nolock(timer_id, clock);
+ }
+
+ void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ if (timer_queue.is_queued(timer_id)) {
+ bool was_first = (&timer_queue.get_root()) == &timer_id;
+ timer_queue.remove(timer_id);
+ if (was_first) {
+ set_timer_from_queue();
+ }
+ }
}
};
close(kqfd);
}
- void setFilterEnabled(short filterType, uintptr_t ident, bool enable)
+ void setFilterEnabled(short filterType, uintptr_t ident, void *udata, bool enable)
{
+ // Note, on OpenBSD enabling or disabling filter will not alter the filter parameters (udata etc);
+ // on OS X however, it will. Therefore we set udata here (to the same value as it was originally
+ // set) in order to work correctly on both kernels.
struct kevent kev;
- EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, 0, 0, 0);
+ EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, 0, 0, udata);
kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
}
void enableFdWatch(int fd, void *userdata, int flags)
{
- setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, true);
+ setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, userdata, true);
}
void enableFdWatch_nolock(int fd, void *userdata, int flags)
void disableFdWatch(int fd, int flags)
{
- setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, false);
+ setFilterEnabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, nullptr, false);
}
void disableFdWatch_nolock(int fd, int flags)
using DMutex = std::mutex;
// A "null" mutex, for which locking / unlocking actually does nothing.
-class NullMutex
+class null_mutex
{
DASYNQ_EMPTY_BODY
--- /dev/null
+#ifndef DASYNC_NARYHEAP_H_INCLUDED
+#define DASYNC_NARYHEAP_H_INCLUDED
+
+#include "dasynq-svec.h"
+#include <type_traits>
+#include <functional>
+#include <limits>
+
+namespace dasynq {
+
+/**
+ * Priority queue implementation based on an "Nary" heap (for power-of-2 N's).
+ *
+ * This is implemented somewhat as a heap-of-(binary)heaps. Eg with an N of 8, each mini heap has 7 elements;
+ * a mini-heap looks like:
+ *
+ * 0
+ * 1 2
+ * 3 4 5 6
+ *
+ * There are N child mini-heaps underneath each mini-heap.
+ *
+ * Heap entry "handles" maintain an index into the heap. When the position of a node in the heap
+ * changes, its handle must be updated.
+ *
+ * T : node data type
+ * P : priority type (eg int)
+ * Compare : functional object type to compare priorities
+ */
+template <typename T, typename P, typename Compare = std::less<P>, int N = 16>
+class NaryHeap
+{
+ public:
+ struct handle_t;
+ using handle_t_r = handle_t &;
+
+ private:
+
+ // Actual heap node
+ class HeapNode
+ {
+ public:
+ P data;
+ handle_t * hnd_p;
+
+ HeapNode(handle_t * hnd, const P &odata) : data(odata), hnd_p(hnd)
+ {
+ // nothing to do
+ }
+
+ HeapNode() { }
+ };
+
+ svector<HeapNode> hvec;
+
+ using hindex_t = typename decltype(hvec)::size_type;
+
+ int root_node = -1;
+ hindex_t num_nodes = 0;
+
+ public:
+
+ // Handle to an element on the heap in the node buffer; also contains the data associated
+ // with the node. (Alternative implementation would be to store the heap data in a
+ // separate container, and have the handle be an index into that container).
+ struct handle_t
+ {
+ T hd;
+ hindex_t heap_index;
+ };
+
+ // Initialise a handle (if it does not have a suitable constructor). Need not do anything
+ // but may store a sentinel value to mark the handle as inactive. It should not be
+ // necessary to call this, really.
+ static void init_handle(handle_t &h) noexcept
+ {
+ }
+
+ private:
+
+ // In hindsight, I probably could have used std::priority_queue rather than re-implementing a
+ // queue here. However, priority_queue doesn't expose the container (except as a protected
+ // member) and so would make it more complicated to reserve storage. It would also have been
+ // necessary to override assignment between elements to correctly update the reference in the
+ // handle.
+
+ bool bubble_down() noexcept
+ {
+ return bubble_down(hvec.size() - 1);
+ }
+
+ // Bubble a newly added timer down to the correct position
+ bool bubble_down(hindex_t pos) noexcept
+ {
+ P op = hvec[pos].data;
+ handle_t * ohndl = hvec[pos].hnd_p;
+ return bubble_down(pos, ohndl, op);
+ }
+
+ bool bubble_down(hindex_t pos, handle_t * ohndl, const P &op) noexcept
+ {
+ // int pos = v.size() - 1;
+ Compare lt;
+
+ while (pos > 0) {
+ // bubble down within micro heap:
+ auto mh_index = pos % (N - 1);
+ auto mh_base = pos - mh_index;
+ while (mh_index > 0) {
+ hindex_t parent = (mh_index - 1) / 2;
+ if (! lt(op, hvec[parent + mh_base].data)) {
+ pos = mh_index + mh_base;
+ hvec[pos].hnd_p = ohndl;
+ ohndl->heap_index = pos;
+ return false;
+ }
+
+ hvec[mh_index + mh_base] = hvec[parent + mh_base];
+ hvec[mh_index + mh_base].hnd_p->heap_index = mh_index + mh_base;
+ mh_index = parent;
+ }
+
+ pos = mh_base;
+ if (pos == 0) break;
+
+ auto containing_unit = pos / (N - 1);
+ auto parent_unit = (containing_unit - 1) / N;
+ auto rem = (containing_unit - 1) % N;
+ auto parent_idx = (rem / 2) + (N / 2) - 1;
+
+ hindex_t parent = parent_unit * (N - 1) + parent_idx;
+ if (! lt(op, hvec[parent].data)) {
+ break;
+ }
+
+ hvec[pos] = hvec[parent];
+ hvec[pos].hnd_p->heap_index = pos;
+ pos = parent;
+ }
+
+ hvec[pos].hnd_p = ohndl;
+ ohndl->heap_index = pos;
+
+ return pos == 0;
+ }
+
+ void bubble_up(hindex_t pos = 0)
+ {
+ P p = hvec[pos].data;
+ handle_t &hndl = *hvec[pos].hnd_p;
+ bubble_up(pos, hvec.size() - 1, hndl, p);
+ }
+
+ void bubble_up(hindex_t pos, hindex_t rmax, handle_t &h, const P &p) noexcept
+ {
+ Compare lt;
+
+ while (true) {
+ auto containing_unit = pos / (N - 1);
+ auto mh_index = pos % (N - 1);
+ auto mh_base = pos - mh_index;
+
+ while (mh_index < (N-2) / 2) {
+ hindex_t selchild;
+ hindex_t lchild = mh_index * 2 + 1 + mh_base;
+ hindex_t rchild = lchild + 1;
+
+ if (rchild >= rmax) {
+ if (lchild > rmax) {
+ hvec[pos].hnd_p = &h;
+ hvec[pos].data = p;
+ h.heap_index = pos;
+ return;
+ }
+ selchild = lchild;
+ }
+ else {
+ // select the sooner of lchild and rchild
+ selchild = lt(hvec[lchild].data, hvec[rchild].data) ? lchild : rchild;
+ }
+
+ if (! lt(hvec[selchild].data, p)) {
+ hvec[pos].hnd_p = &h;
+ hvec[pos].data = p;
+ h.heap_index = pos;
+ return;
+ }
+
+ hvec[pos] = hvec[selchild];
+ hvec[pos].hnd_p->heap_index = pos;
+ pos = selchild;
+ mh_index = pos - mh_base;
+ }
+
+ auto left_unit = containing_unit * N + (mh_index - (N/2 - 1)) * 2 + 1;
+ // auto right_unit = left_unit + 1;
+
+ hindex_t selchild;
+ hindex_t lchild = left_unit * (N - 1);
+ hindex_t rchild = lchild + (N - 1);
+ if (rchild >= rmax) {
+ if (lchild > rmax) {
+ break;
+ }
+ selchild = lchild;
+ }
+ else {
+ // select the sooner of lchild and rchild
+ selchild = lt(hvec[lchild].data, hvec[rchild].data) ? lchild : rchild;
+ }
+
+ if (! lt(hvec[selchild].data, p)) {
+ break;
+ }
+
+ hvec[pos] = hvec[selchild];
+ hvec[pos].hnd_p->heap_index = pos;
+ pos = selchild;
+ }
+
+ hvec[pos].hnd_p = &h;
+ hvec[pos].data = p;
+ h.heap_index = pos;
+ }
+
+ void remove_h(hindex_t hidx)
+ {
+ // bvec[hvec[hidx].data_index].heap_index = -1;
+ hvec[hidx].hnd_p->heap_index = -1;
+ if (hvec.size() != hidx + 1) {
+ // replace the first element with the last:
+ // bvec[hvec.back().data_index].heap_index = hidx;
+
+ //hvec.back().hnd_p->heap_index = hidx;
+ //hvec[hidx] = hvec.back();
+ //hvec.pop_back();
+
+ // Now bubble up:
+ //bubble_up(hidx);
+
+ bubble_up(hidx, hvec.size() - 2, *(hvec.back().hnd_p), hvec.back().data);
+ hvec.pop_back();
+ }
+ else {
+ hvec.pop_back();
+ }
+ }
+
+ public:
+
+ T & node_data(handle_t & index) noexcept
+ {
+ return index.hd;
+ }
+
+ // Allocate a slot, but do not incorporate into the heap:
+ // u... : parameters for data constructor T::T(...)
+ template <typename ...U> void allocate(handle_t & hnd, U... u)
+ {
+ new (& hnd.hd) T(u...);
+ hnd.heap_index = -1;
+ constexpr hindex_t max_allowed = std::numeric_limits<hindex_t>::is_signed ?
+ std::numeric_limits<hindex_t>::max() : ((hindex_t) - 2);
+
+ if (num_nodes == max_allowed) {
+ throw std::bad_alloc();
+ }
+
+ num_nodes++;
+
+ if (__builtin_expect(hvec.capacity() < num_nodes, 0)) {
+ hindex_t half_point = max_allowed / 2;
+ try {
+ if (__builtin_expect(num_nodes < half_point, 1)) {
+ hvec.reserve(num_nodes * 2);
+ }
+ else {
+ hvec.reserve(max_allowed);
+ }
+ }
+ catch (std::bad_alloc &e) {
+ hvec.reserve(num_nodes);
+ }
+ }
+ }
+
+ // Deallocate a slot
+ void deallocate(handle_t & index) noexcept
+ {
+ num_nodes--;
+
+ // shrink the capacity of hvec if num_nodes is sufficiently less than
+ // its current capacity:
+ if (num_nodes < hvec.capacity() / 4) {
+ hvec.shrink_to(num_nodes * 2);
+ }
+ }
+
+ bool insert(handle_t & hnd) noexcept
+ {
+ P pval = P();
+ return insert(hnd, pval);
+ }
+
+ bool insert(handle_t & hnd, const P & pval) noexcept
+ {
+ hvec.emplace_back(&hnd, pval);
+ return bubble_down(hvec.size() - 1, &hnd, pval);
+ }
+
+ // Get the root node handle. (Returns a handle_t or reference to handle_t).
+ handle_t & get_root()
+ {
+ return * hvec[0].hnd_p;
+ }
+
+ P &get_root_priority()
+ {
+ return hvec[0].data;
+ }
+
+ void pull_root()
+ {
+ remove_h(0);
+ }
+
+ void remove(handle_t & hnd)
+ {
+ remove_h(hnd.heap_index);
+ }
+
+ bool empty()
+ {
+ return hvec.empty();
+ }
+
+ bool is_queued(handle_t hnd)
+ {
+ return hnd.heap_index != (hindex_t) -1;
+ }
+
+ // Set a node priority. Returns true iff the node becomes the root node (and wasn't before).
+ bool set_priority(handle_t & hnd, const P& p)
+ {
+ int heap_index = hnd.heap_index;
+
+ Compare lt;
+ if (lt(hvec[heap_index].data, p)) {
+ // Increase key
+ hvec[heap_index].data = p;
+ bubble_up(heap_index);
+ return false;
+ }
+ else {
+ // Decrease key
+ hvec[heap_index].data = p;
+ return bubble_down(heap_index);
+ }
+ }
+};
+
+}
+
+#endif
{
return array[size_v - 1];
}
+
+ T* begin() const
+ {
+ return array;
+ }
+
+ T* end() const
+ {
+ return array + size_v;
+ }
};
#ifndef DASYNQ_TIMERBASE_H_INCLUDED
#define DASYNQ_TIMERBASE_H_INCLUDED
+#include "dasynq-naryheap.h"
+
namespace dasynq {
class TimerData
}
};
-using timer_handle_t = BinaryHeap<TimerData, struct timespec, CompareTimespec>::handle_t;
+using timer_queue_t = NaryHeap<TimerData, struct timespec, CompareTimespec>;
+using timer_handle_t = timer_queue_t::handle_t;
-static void init_timer_handle(timer_handle_t &hnd) noexcept
+static inline void init_timer_handle(timer_handle_t &hnd) noexcept
{
- BinaryHeap<TimerData, struct timespec, CompareTimespec>::init_handle(hnd);
+ timer_queue_t::init_handle(hnd);
}
+static inline int divide_timespec(const struct timespec &num, const struct timespec &den, struct timespec &rem)
+{
+ if (num.tv_sec < den.tv_sec) {
+ rem = num;
+ return 0;
+ }
+
+ if (num.tv_sec == den.tv_sec) {
+ if (num.tv_nsec < den.tv_nsec) {
+ rem = num;
+ return 0;
+ }
+ if (num.tv_sec == 0) {
+ rem.tv_sec = 0;
+ rem.tv_nsec = num.tv_nsec % den.tv_nsec;
+ return num.tv_nsec / den.tv_nsec;
+ }
+ // num.tv_sec == den.tv_sec and both are >= 1.
+ // The result can only be 1:
+ rem.tv_sec = 0;
+ rem.tv_nsec = num.tv_nsec - den.tv_nsec;
+ return 1;
+ }
+
+ // At this point, num.tv_sec >= 1.
+
+ auto &r_sec = rem.tv_sec;
+ auto &r_nsec = rem.tv_nsec;
+ r_sec = num.tv_sec;
+ r_nsec = num.tv_nsec;
+ auto d_sec = den.tv_sec;
+ auto d_nsec = den.tv_nsec;
+
+ r_sec -= d_sec;
+ if (r_nsec >= d_nsec) {
+ r_nsec -= d_nsec;
+ }
+ else {
+ r_nsec += (1000000000ULL - d_nsec);
+ r_sec -= 1;
+ }
+
+ // Check now for common case: one timer expiry with no overrun
+ if (r_sec < d_sec || (r_sec == d_sec && r_nsec < d_nsec)) {
+ return 1;
+ }
+
+ int nval = 1;
+ int rval = 1; // we have subtracted 1*D already
+
+ // shift denominator until it is greater than/equal to numerator:
+ while (d_sec < r_sec) {
+ d_sec *= 2;
+ d_nsec *= 2;
+ if (d_nsec >= 1000000000) {
+ d_nsec -= 1000000000;
+ d_sec++;
+ }
+ nval *= 2;
+ }
+
+ while (nval > 0) {
+ if (d_sec < r_sec || (d_sec == r_sec && d_nsec <= r_nsec)) {
+ // subtract:
+ r_sec -= d_sec;
+ if (d_nsec > r_nsec) {
+ r_nsec += 1000000000;
+ r_sec--;
+ }
+ r_nsec -= d_nsec;
+
+ rval += nval;
+ }
+
+ bool low = d_sec & 1;
+ d_nsec /= 2;
+ d_nsec += low ? 500000000ULL : 0;
+ d_sec /= 2;
+ nval /= 2;
+ }
+
+ return rval;
+}
+
+template <typename Base> class timer_base : public Base
+{
+ protected:
+
+ void process_timer_queue(timer_queue_t &queue, const struct timespec &curtime)
+ {
+ // Peek timer queue; calculate difference between current time and timeout
+ const struct timespec * timeout = &queue.get_root_priority();
+ while (timeout->tv_sec < curtime.tv_sec || (timeout->tv_sec == curtime.tv_sec &&
+ timeout->tv_nsec <= curtime.tv_nsec)) {
+ auto & thandle = queue.get_root();
+ TimerData &data = queue.node_data(thandle);
+ timespec &interval = data.interval_time;
+ data.expiry_count++;
+ queue.pull_root();
+ if (interval.tv_sec == 0 && interval.tv_nsec == 0) {
+ // Non periodic timer
+ if (data.enabled) {
+ data.enabled = false;
+ int expiry_count = data.expiry_count;
+ data.expiry_count = 0;
+ Base::receiveTimerExpiry(thandle, data.userdata, expiry_count);
+ }
+ if (queue.empty()) {
+ break;
+ }
+ }
+ else {
+ // First calculate the overrun in time:
+ struct timespec diff;
+ diff.tv_sec = curtime.tv_sec - timeout->tv_sec;
+ if (curtime.tv_nsec > timeout->tv_nsec) {
+ diff.tv_nsec = curtime.tv_nsec - timeout->tv_nsec;
+ }
+ else {
+ diff.tv_nsec = 1000000000 - timeout->tv_nsec + curtime.tv_nsec;
+ diff.tv_sec--;
+ }
+
+ // Now we have to divide the time overrun by the period to find the
+ // interval overrun. This requires a division of a value not representable
+ // as a long...
+ struct timespec rem;
+ data.expiry_count += divide_timespec(diff, interval, rem);
+
+ // new time is current time + interval - remainder:
+ struct timespec newtime = curtime;
+ newtime.tv_sec += interval.tv_sec;
+ newtime.tv_nsec += interval.tv_nsec;
+ if (newtime.tv_nsec > 1000000000) {
+ newtime.tv_nsec -= 1000000000;
+ newtime.tv_sec++;
+ }
+ newtime.tv_sec -= rem.tv_sec;
+ if (rem.tv_nsec > newtime.tv_nsec) {
+ newtime.tv_nsec += 1000000000 - rem.tv_nsec;
+ newtime.tv_sec--;
+ }
+ else {
+ newtime.tv_nsec -= rem.tv_nsec;
+ }
+
+ queue.insert(thandle, newtime);
+ if (data.enabled) {
+ data.enabled = false;
+ int expiry_count = data.expiry_count;
+ data.expiry_count = 0;
+ Base::receiveTimerExpiry(thandle, data.userdata, expiry_count);
+ }
+ }
+
+ // repeat until all expired timeouts processed
+ timeout = &queue.get_root_priority();
+ }
+ }
+};
+
}
#endif /* DASYNQ_TIMERBASE_H_INCLUDED */
#include <time.h>
#include "dasynq-timerbase.h"
-#include "dasynq-binaryheap.h"
namespace dasynq {
// we are given a handle; we need to use this to modify the watch. We delegate the
// process of allocating a handle to a priority heap implementation (BinaryHeap).
-template <class Base> class TimerFdEvents : public Base
+template <class Base> class TimerFdEvents : public timer_base<Base>
{
private:
int timerfd_fd = -1;
int systemtime_fd = -1;
- using timer_queue_t = BinaryHeap<TimerData, struct timespec, CompareTimespec>;
timer_queue_t timer_queue;
timer_queue_t wallclock_queue;
- static int divide_timespec(const struct timespec &num, const struct timespec &den) noexcept
- {
- // TODO
- return 0;
- }
-
// Set the timerfd timeout to match the first timer in the queue (disable the timerfd
// if there are no active timers).
static void set_timer_from_queue(int fd, timer_queue_t &queue) noexcept
DASYNQ_UNREACHABLE;
}
- // Peek timer queue; calculate difference between current time and timeout
- struct timespec * timeout = &queue.get_root_priority();
- while (timeout->tv_sec < curtime.tv_sec || (timeout->tv_sec == curtime.tv_sec &&
- timeout->tv_nsec <= curtime.tv_nsec)) {
- // Increment expiry count
- queue.node_data(queue.get_root()).expiry_count++;
- // (a periodic timer may have overrun; calculated below).
+ timer_base<Base>::process_timer_queue(queue, curtime);
- auto thandle = queue.get_root();
- TimerData &data = queue.node_data(thandle);
- timespec &interval = data.interval_time;
- if (interval.tv_sec == 0 && interval.tv_nsec == 0) {
- // Non periodic timer
- queue.pull_root();
- if (data.enabled) {
- int expiry_count = data.expiry_count;
- data.expiry_count = 0;
- Base::receiveTimerExpiry(thandle, data.userdata, expiry_count);
- }
- if (queue.empty()) {
- break;
- }
- }
- else {
- // Periodic timer TODO
- // First calculate the overrun in time:
- /*
- struct timespec diff;
- diff.tv_sec = curtime.tv_sec - timeout->tv_sec;
- diff.tv_nsec = curtime.tv_nsec - timeout->tv_nsec;
- if (diff.tv_nsec < 0) {
- diff.tv_nsec += 1000000000;
- diff.tv_sec--;
- }
- */
- // Now we have to divide the time overrun by the period to find the
- // interval overrun. This requires a division of a value not representable
- // as a long...
- // TODO use divide_timespec
- // TODO better not to remove from queue maybe, but instead mark as inactive,
- // adjust timeout, and bubble into correct position
- // call Base::receieveTimerEvent
- // TODO
- }
-
- // repeat until all expired timeouts processed
- // timeout = &timer_queue[0].timeout;
- // (shouldn't be necessary; address hasn't changed...)
- }
// arm timerfd with timeout from head of queue
set_timer_from_queue(fd, queue);
}
// TODO locking (here and everywhere)
}
+ timer_queue_t & get_queue(clock_type clock)
+ {
+ switch(clock) {
+ case clock_type::SYSTEM:
+ return wallclock_queue;
+ case clock_type::MONOTONIC:
+ return timer_queue;
+ default:
+ DASYNQ_UNREACHABLE;
+ }
+ }
+
public:
template <typename T>
void receiveFdEvent(T &loop_mech, typename Base::FD_r fd_r, void * userdata, int flags)
// Add timer, store into given handle
void addTimer(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC)
{
- switch(clock) {
- case clock_type::SYSTEM:
- wallclock_queue.allocate(h, userdata);
- break;
- case clock_type::MONOTONIC:
- timer_queue.allocate(h, userdata);
- break;
- default:
- DASYNQ_UNREACHABLE;
- }
+ timer_queue_t & queue = get_queue(clock);
+ queue.allocate(h, userdata);
}
void removeTimer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
void removeTimer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
{
- switch(clock) {
- case clock_type::SYSTEM:
- if (wallclock_queue.is_queued(timer_id)) {
- wallclock_queue.remove(timer_id);
- }
- wallclock_queue.deallocate(timer_id);
- break;
- case clock_type::MONOTONIC:
- if (timer_queue.is_queued(timer_id)) {
- timer_queue.remove(timer_id);
- }
- timer_queue.deallocate(timer_id);
- break;
- default:
- DASYNQ_UNREACHABLE;
+ timer_queue_t & queue = get_queue(clock);
+ if (queue.is_queued(timer_id)) {
+ queue.remove(timer_id);
}
+ queue.deallocate(timer_id);
}
-
+
+ void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ stop_timer_nolock(timer_id, clock);
+ }
+
+ void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+ {
+ timer_queue_t & queue = get_queue(clock);
+ if (queue.is_queued(timer_id)) {
+ queue.remove(timer_id);
+ }
+ }
+
// starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0.
// enable: specifies whether to enable reporting of timeouts/intervals
void setTimer(timer_handle_t & timer_id, struct timespec &timeout, struct timespec &interval,
void enableTimer_nolock(timer_handle_t & timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
{
- switch (clock) {
- case clock_type::SYSTEM:
- wallclock_queue.node_data(timer_id).enabled = enable;
- break;
- case clock_type::MONOTONIC:
- timer_queue.node_data(timer_id).enabled = enable;
- break;
- default:
- DASYNQ_UNREACHABLE;
+ timer_queue_t & queue = get_queue(clock);
+
+ auto &node_data = queue.node_data(timer_id);
+ auto expiry_count = node_data.expiry_count;
+ if (expiry_count != 0) {
+ node_data.expiry_count = 0;
+ Base::receiveTimerExpiry(timer_id, node_data.userdata, expiry_count);
+ }
+ else {
+ queue.node_data(timer_id).enabled = enable;
}
}
#include "dasynq-config.h"
#include "dasynq-flags.h"
-#include "dasynq-binaryheap.h"
+#include "dasynq-naryheap.h"
#include "dasynq-interrupt.h"
+// Dasynq uses a "mix-in" pattern to produce an event loop implementation incorporating selectable implementations of
+// various components (main backend, timers, child process watch mechanism etc). In C++ this can be achieved by
+// a template for some component which extends its own type parameter:
+//
+// template <typename Base> class X : public B { .... }
+//
+// We can chain several such components together (and so so below) to "mix in" the functionality of each into the final
+// class, eg:
+//
+// template <typename T> using Loop = EpollLoop<interrupt_channel<TimerFdEvents<ChildProcEvents<T>>>>;
+//
+// (which defines an alias template "Loop", whose implementation will use the epoll backend, a standard interrupt channel
+// implementation, a timerfd-based timer implementation, and the standard child process watch implementation).
+// We sometimes need the base class to be able to call derived-class members: to do this we pass a reference to
+// the derived instance into a templated method in the base, called "init":
+//
+// template <typename T> void init(T *derived)
+// {
+// // can call method on derived:
+// derived->add_listener();
+// // chain to next class:
+// Base::init(derived);
+// }
+//
+// At the base all this is the EventDispatch class, defined below, which receives event notifications and inserts
+// them into a queue for processing. The event_loop class, also below, wraps this (via composition) in an interface
+// which can be used to register/de-regsiter/enable/disable event watchers, and which can process the queued events
+// by calling the watcher callbacks. The event_loop class also provides some synchronisation to ensure thread-safety.
+
#if defined(DASYNQ_CUSTOM_LOOP_IMPLEMENTATION)
// Loop and LoopTraits defined already; used for testing
#elif defined(DASYNQ_HAVE_KQUEUE)
class BaseWatcher;
}
-using PrioQueue = BinaryHeap<dprivate::BaseWatcher *, int>;
+using PrioQueue = NaryHeap<dprivate::BaseWatcher *, int>;
inline namespace {
constexpr int DEFAULT_PRIORITY = 50;
template <typename T_Loop> class child_proc_watcher;
template <typename T_Loop> class timer;
+ template <typename, typename> class fd_watcher_impl;
+ template <typename, typename> class bidi_fd_watcher_impl;
+ template <typename, typename> class signal_watcher_impl;
+ template <typename, typename> class child_proc_watcher_impl;
+ template <typename, typename> class timer_impl;
+
enum class WatchType
{
SIGNAL,
BaseWatcher(WatchType wt) noexcept : watchType(wt) { }
+ virtual void dispatch(void *loop_ptr) noexcept { };
+ virtual void dispatch_second(void *loop_ptr) noexcept { }
+
virtual ~BaseWatcher() noexcept { }
// Called when the watcher has been removed.
BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { }
public:
- using SigInfo = typename Traits::SigInfo;
- typedef SigInfo &SigInfo_p;
+ using siginfo_t = typename Traits::SigInfo;
+ typedef siginfo_t &siginfo_p;
};
template <typename T_Mutex>
// The main instance is the "input" watcher only; we keep a secondary watcher
// with a secondary set of flags for the "output" watcher:
- BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD);
+ BaseWatcher outWatcher {WatchType::SECONDARYFD};
int read_removed : 1; // read watch removed?
int write_removed : 1; // write watch removed?
protected:
timer_handle_t timer_handle;
int intervals;
+ clock_type clock;
BaseTimerWatcher() : BaseWatcher(WatchType::TIMER)
{
template <typename T_Mutex> class waitqueue;
template <typename T_Mutex> class waitqueue_node;
- // Select an appropriate conditiona variable type for a mutex:
+ // Select an appropriate condition variable type for a mutex:
// condition_variable if mutex is std::mutex, or condition_variable_any
// otherwise.
template <class T_Mutex> class condvarSelector;
typedef std::condition_variable_any condvar;
};
- template <> class waitqueue_node<NullMutex>
+ template <> class waitqueue_node<null_mutex>
{
- // Specialised waitqueue_node for NullMutex.
- friend class waitqueue<NullMutex>;
+ // Specialised waitqueue_node for null_mutex.
+ friend class waitqueue<null_mutex>;
public:
- void wait(std::unique_lock<NullMutex> &ul) { }
+ void wait(std::unique_lock<null_mutex> &ul) { }
void signal() { }
DASYNQ_EMPTY_BODY;
}
};
- template <> class waitqueue<NullMutex>
+ template <> class waitqueue<null_mutex>
{
public:
- waitqueue_node<NullMutex> * unqueue()
+ waitqueue_node<null_mutex> * unqueue()
{
return nullptr;
}
- waitqueue_node<NullMutex> * getHead()
+ waitqueue_node<null_mutex> * getHead()
{
return nullptr;
}
- bool checkHead(waitqueue_node<NullMutex> &node)
+ bool checkHead(waitqueue_node<null_mutex> &node)
{
return true;
}
return true;
}
- void queue(waitqueue_node<NullMutex> *node)
+ void queue(waitqueue_node<null_mutex> *node)
{
}
};
if (is_multi_watch) {
BaseBidiFdWatcher *bbdw = static_cast<BaseBidiFdWatcher *>(bwatcher);
bbdw->watch_flags &= ~flags;
- if (flags & IN_EVENTS && flags & OUT_EVENTS) {
+ if ((flags & IN_EVENTS) && (flags & OUT_EVENTS)) {
// Queue the secondary watcher first:
queueWatcher(&bbdw->outWatcher);
}
return nullptr;
}
- auto rhndl = event_queue.get_root();
+ auto & rhndl = event_queue.get_root();
BaseWatcher *r = event_queue.node_data(rhndl);
event_queue.pull_root();
return r;
friend class dprivate::child_proc_watcher<my_event_loop_t>;
friend class dprivate::timer<my_event_loop_t>;
+ template <typename, typename> friend class dprivate::fd_watcher_impl;
+ template <typename, typename> friend class dprivate::bidi_fd_watcher_impl;
+ template <typename, typename> friend class dprivate::signal_watcher_impl;
+ template <typename, typename> friend class dprivate::child_proc_watcher_impl;
+ template <typename, typename> friend class dprivate::timer_impl;
+
public:
using loop_traits_t = LoopTraits;
loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true, clock);
}
+ void stop_timer(BaseTimerWatcher *callback, clock_type clock)
+ {
+ loop_mech.stop_timer(callback->timer_handle, clock);
+ }
+
void deregister(BaseTimerWatcher *callback, clock_type clock)
{
loop_mech.removeTimer(callback->timer_handle, clock);
}
}
else if (rearmType == rearm::DISARM) {
- // Nothing more to do
+ // TODO should actually disarm.
}
else if (rearmType == rearm::REARM) {
bdfw->watch_flags |= IN_EVENTS;
}
}
else if (rearmType == rearm::DISARM) {
- // Nothing more to do
+ // TODO actually disarm.
}
else if (rearmType == rearm::REARM) {
bdfw->watch_flags |= OUT_EVENTS;
{
// Called with lock held
if (rearmType == rearm::REARM) {
- loop_mech.enableTimer_nolock(btw->timer_handle, true);
+ loop_mech.enableTimer_nolock(btw->timer_handle, true, btw->clock);
}
else if (rearmType == rearm::REMOVE) {
- loop_mech.removeTimer_nolock(btw->timer_handle);
+ loop_mech.removeTimer_nolock(btw->timer_handle, btw->clock);
}
+ // TODO DISARM?
}
bool processEvents() noexcept
ed.lock.lock();
// So this pulls *all* currently pending events and processes them in the current thread.
- // That's probably good for throughput, but maybe the behavior should be configurable.
+ // That's probably good for throughput, but maybe the behaviour should be configurable.
BaseWatcher * pqueue = ed.pullEvent();
bool active = false;
pqueue->active = true;
active = true;
- rearm rearmType = rearm::NOOP;
- bool is_multi_watch = false;
BaseBidiFdWatcher *bbfw = nullptr;
// (Above variables are initialised only to silence compiler warnings).
- // Read/manipulate watch_flags (if necessary) *before* we release the lock:
- if (pqueue->watchType == WatchType::FD) {
- BaseFdWatcher *bfw = static_cast<BaseFdWatcher *>(pqueue);
- bbfw = static_cast<BaseBidiFdWatcher *>(bfw);
- is_multi_watch = bfw->watch_flags & dprivate::multi_watch;
- if (! LoopTraits::has_separate_rw_fd_watches && is_multi_watch) {
- // Clear the input watch flags to avoid enabling read watcher while active:
- bfw->watch_flags &= ~IN_EVENTS;
- }
- }
- else if (pqueue->watchType == WatchType::SECONDARYFD) {
- is_multi_watch = true;
+ if (pqueue->watchType == WatchType::SECONDARYFD) {
// construct a pointer to the main watcher:
char * rp = (char *)pqueue;
rp -= offsetof(BaseBidiFdWatcher, outWatcher);
bbfw = (BaseBidiFdWatcher *)rp;
- if (! LoopTraits::has_separate_rw_fd_watches) {
- bbfw->watch_flags &= ~OUT_EVENTS;
- }
- }
-
- ed.lock.unlock();
-
- // Note that we select actions based on the type of the watch, as determined by the watchType
- // member. In some ways this screams out for polmorphism; a virtual function could be overridden
- // by each of the watcher types. I've instead used switch/case because I think it will perform
- // slightly better without the overhead of a virtual function dispatch, but it's got to be a
- // close call; I might be guilty of premature optimisation here.
-
- switch (pqueue->watchType) {
- case WatchType::SIGNAL: {
- BaseSignalWatcher *bsw = static_cast<BaseSignalWatcher *>(pqueue);
- rearmType = ((signal_watcher *)bsw)->received(*this, bsw->siginfo.get_signo(), bsw->siginfo);
- break;
- }
- case WatchType::FD: {
- BaseFdWatcher *bfw = static_cast<BaseFdWatcher *>(pqueue);
- if (is_multi_watch) {
- // The primary watcher for a multi-watch watcher is queued for
- // read events.
- rearmType = ((bidi_fd_watcher *)bbfw)->read_ready(*this, bfw->watch_fd);
- bbfw->event_flags &= ~IN_EVENTS;
- }
- else {
- rearmType = ((fd_watcher *)bfw)->fd_event(*this, bfw->watch_fd, bfw->event_flags);
- bfw->event_flags = 0;
- }
- break;
- }
- case WatchType::CHILD: {
- BaseChildWatcher *bcw = static_cast<BaseChildWatcher *>(pqueue);
- rearmType = ((child_proc_watcher *)bcw)->child_status(*this, bcw->watch_pid, bcw->child_status);
- break;
- }
- case WatchType::SECONDARYFD: {
- rearmType = ((bidi_fd_watcher *)bbfw)->write_ready(*this, bbfw->watch_fd);
- bbfw->event_flags &= ~OUT_EVENTS;
- break;
- }
- case WatchType::TIMER: {
- BaseTimerWatcher *btw = static_cast<BaseTimerWatcher *>(pqueue);
- rearmType = ((timer *)btw)->timer_expiry(*this, btw->intervals);
- break;
- }
- default: ;
- }
- ed.lock.lock();
-
- // (if REMOVED, we must not touch pqueue at all)
- if (rearmType != rearm::REMOVED) {
-
- pqueue->active = false;
- if (pqueue->deleteme) {
- // We don't want a watch that is marked "deleteme" to re-arm itself.
- rearmType = rearm::REMOVE;
- }
- switch (pqueue->watchType) {
- case WatchType::SIGNAL:
- processSignalRearm(static_cast<BaseSignalWatcher *>(pqueue), rearmType);
- break;
- case WatchType::FD:
- rearmType = processFdRearm(static_cast<BaseFdWatcher *>(pqueue), rearmType, is_multi_watch);
- break;
- case WatchType::SECONDARYFD:
- rearmType = processSecondaryRearm(bbfw, rearmType);
- break;
- case WatchType::TIMER:
- processTimerRearm(static_cast<BaseTimerWatcher *>(pqueue), rearmType);
- break;
- default: ;
- }
-
- if (rearmType == rearm::REMOVE) {
- ed.lock.unlock();
- // Note that for BidiFd watches, watch_removed is only called on the primary watch.
- // The process function called above only returns Rearm::REMOVE if both primary and
- // secondary watches have been removed.
- (is_multi_watch ? bbfw : pqueue)->watch_removed();
- ed.lock.lock();
- }
+ // issue a secondary dispatch:
+ bbfw->dispatch_second(this);
+ pqueue = ed.pullEvent();
+ continue;
}
-
+
+ pqueue->dispatch(this);
pqueue = ed.pullEvent();
}
using child_proc_watcher = dprivate::child_proc_watcher<my_event_loop_t>;
using timer = dprivate::timer<my_event_loop_t>;
+ template <typename D> using fd_watcher_impl = dprivate::fd_watcher_impl<my_event_loop_t, D>;
+ template <typename D> using bidi_fd_watcher_impl = dprivate::bidi_fd_watcher_impl<my_event_loop_t, D>;
+ template <typename D> using signal_watcher_impl = dprivate::signal_watcher_impl<my_event_loop_t, D>;
+ template <typename D> using child_proc_watcher_impl = dprivate::child_proc_watcher_impl<my_event_loop_t, D>;
+ template <typename D> using timer_impl = dprivate::timer_impl<my_event_loop_t, D>;
+
void run() noexcept
{
while (! processEvents()) {
}
};
-typedef event_loop<NullMutex> NEventLoop;
+typedef event_loop<null_mutex> NEventLoop;
typedef event_loop<std::mutex> TEventLoop;
// from dasync.cc:
template <typename EventLoop>
class signal_watcher : private dprivate::BaseSignalWatcher<typename EventLoop::mutex_t, typename EventLoop::loop_traits_t>
{
+ template <typename, typename> friend class signal_watcher_impl;
+
using BaseWatcher = dprivate::BaseWatcher;
using T_Mutex = typename EventLoop::mutex_t;
-public:
- using SigInfo_p = typename dprivate::BaseSignalWatcher<T_Mutex, typename EventLoop::loop_traits_t>::SigInfo_p;
+ public:
+ using siginfo_p = typename dprivate::BaseSignalWatcher<T_Mutex, typename EventLoop::loop_traits_t>::siginfo_p;
// Register this watcher to watch the specified signal.
// If an attempt is made to register with more than one event loop at
template <typename T>
static signal_watcher<EventLoop> *add_watch(EventLoop &eloop, int signo, T watchHndlr)
{
- class LambdaSigWatcher : public signal_watcher<EventLoop>
+ class LambdaSigWatcher : public signal_watcher_impl<EventLoop, LambdaSigWatcher>
{
private:
T watchHndlr;
//
}
- rearm received(EventLoop &eloop, int signo, SigInfo_p siginfo) override
+ rearm received(EventLoop &eloop, int signo, siginfo_p siginfo)
{
return watchHndlr(eloop, signo, siginfo);
}
return lsw;
}
- virtual rearm received(EventLoop &eloop, int signo, SigInfo_p siginfo) = 0;
+ // virtual rearm received(EventLoop &eloop, int signo, siginfo_p siginfo) = 0;
+};
+
+template <typename EventLoop, typename Derived>
+class signal_watcher_impl : public signal_watcher<EventLoop>
+{
+ void dispatch(void *loop_ptr) noexcept override
+ {
+ EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
+ loop.getBaseLock().unlock();
+
+ auto rearmType = static_cast<Derived *>(this)->received(loop, this->siginfo.get_signo(), this->siginfo);
+
+ loop.getBaseLock().lock();
+
+ if (rearmType != rearm::REMOVED) {
+
+ this->active = false;
+ if (this->deleteme) {
+ // We don't want a watch that is marked "deleteme" to re-arm itself.
+ rearmType = rearm::REMOVE;
+ }
+
+ loop.processSignalRearm(this, rearmType);
+
+ if (rearmType == rearm::REMOVE) {
+ loop.getBaseLock().unlock();
+ this->watch_removed();
+ loop.getBaseLock().lock();
+ }
+ }
+ }
};
// Posix file descriptor event watcher
template <typename EventLoop>
class fd_watcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
{
+ template <typename, typename> friend class fd_watcher_impl;
+
using BaseWatcher = dprivate::BaseWatcher;
using T_Mutex = typename EventLoop::mutex_t;
template <typename T>
static fd_watcher<EventLoop> *add_watch(EventLoop &eloop, int fd, int flags, T watchHndlr)
{
- class LambdaFdWatcher : public fd_watcher<EventLoop>
+ class LambdaFdWatcher : public fd_watcher_impl<EventLoop, LambdaFdWatcher>
{
private:
T watchHndlr;
-
+
public:
LambdaFdWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a)
{
//
}
-
- rearm fd_event(EventLoop &eloop, int fd, int flags) override
+
+ rearm fd_event(EventLoop &eloop, int fd, int flags)
{
return watchHndlr(eloop, fd, flags);
}
-
+
void watch_removed() noexcept override
{
delete this;
return lfd;
}
- virtual rearm fd_event(EventLoop &eloop, int fd, int flags) = 0;
+ // virtual rearm fd_event(EventLoop &eloop, int fd, int flags) = 0;
+};
+
+template <typename EventLoop, typename Derived>
+class fd_watcher_impl : public fd_watcher<EventLoop>
+{
+ void dispatch(void *loop_ptr) noexcept override
+ {
+ EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
+ loop.getBaseLock().unlock();
+
+ auto rearmType = static_cast<Derived *>(this)->fd_event(loop, this->watch_fd, this->event_flags);
+
+ loop.getBaseLock().lock();
+
+ if (rearmType != rearm::REMOVED) {
+ this->event_flags = 0;
+ this->active = false;
+ if (this->deleteme) {
+ // We don't want a watch that is marked "deleteme" to re-arm itself.
+ rearmType = rearm::REMOVE;
+ }
+
+ rearmType = loop.processFdRearm(this, rearmType, false);
+
+ if (rearmType == rearm::REMOVE) {
+ loop.getBaseLock().unlock();
+ this->watch_removed();
+ loop.getBaseLock().lock();
+ }
+ }
+ }
};
+
// A Bi-directional file descriptor watcher with independent read- and write- channels.
// This watcher type has two event notification methods which can both potentially be
// active at the same time.
template <typename EventLoop>
class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mutex_t>
{
+ template <typename, typename> friend class bidi_fd_watcher_impl;
+
using BaseWatcher = dprivate::BaseWatcher;
using T_Mutex = typename EventLoop::mutex_t;
eloop.deregister(this, this->watch_fd);
}
- virtual rearm read_ready(EventLoop &eloop, int fd) noexcept = 0;
- virtual rearm write_ready(EventLoop &eloop, int fd) noexcept = 0;
+ // virtual rearm read_ready(EventLoop &eloop, int fd) noexcept = 0;
+ // virtual rearm write_ready(EventLoop &eloop, int fd) noexcept = 0;
+};
+
+template <typename EventLoop, typename Derived>
+class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
+{
+ void dispatch(void *loop_ptr) noexcept override
+ {
+ EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
+ loop.getBaseLock().unlock();
+
+ auto rearmType = static_cast<Derived *>(this)->read_ready(loop, this->watch_fd);
+
+ loop.getBaseLock().lock();
+
+ if (rearmType != rearm::REMOVED) {
+ this->event_flags &= ~IN_EVENTS;
+ this->active = false;
+ if (this->deleteme) {
+ // We don't want a watch that is marked "deleteme" to re-arm itself.
+ rearmType = rearm::REMOVE;
+ }
+
+ rearmType = loop.processFdRearm(this, rearmType, true);
+
+ if (rearmType == rearm::REMOVE) {
+ loop.getBaseLock().unlock();
+ this->watch_removed();
+ loop.getBaseLock().lock();
+ }
+ }
+ }
+
+ void dispatch_second(void *loop_ptr) noexcept override
+ {
+ EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
+ loop.getBaseLock().unlock();
+
+ auto rearmType = static_cast<Derived *>(this)->write_ready(loop, this->watch_fd);
+
+ loop.getBaseLock().lock();
+
+ if (rearmType != rearm::REMOVED) {
+ this->event_flags &= ~OUT_EVENTS;
+ this->active = false;
+ if (this->deleteme) {
+ // We don't want a watch that is marked "deleteme" to re-arm itself.
+ rearmType = rearm::REMOVE;
+ }
+
+ rearmType = loop.processSecondaryRearm(this, rearmType);
+
+ if (rearmType == rearm::REMOVE) {
+ loop.getBaseLock().unlock();
+ this->watch_removed();
+ loop.getBaseLock().lock();
+ }
+ }
+ }
};
// Child process event watcher
template <typename EventLoop>
class child_proc_watcher : private dprivate::BaseChildWatcher<typename EventLoop::mutex_t>
{
+ template <typename, typename> friend class child_proc_watcher_impl;
+
using BaseWatcher = dprivate::BaseWatcher;
using T_Mutex = typename EventLoop::mutex_t;
}
}
- virtual rearm child_status(EventLoop &eloop, pid_t child, int status) = 0;
+ // virtual rearm child_status(EventLoop &eloop, pid_t child, int status) = 0;
+};
+
+template <typename EventLoop, typename Derived>
+class child_proc_watcher_impl : public child_proc_watcher<EventLoop>
+{
+ void dispatch(void *loop_ptr) noexcept override
+ {
+ EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
+ loop.getBaseLock().unlock();
+
+ auto rearmType = static_cast<Derived *>(this)->child_status(loop, this->watch_pid, this->child_status);
+
+ loop.getBaseLock().lock();
+
+ if (rearmType != rearm::REMOVED) {
+
+ this->active = false;
+ if (this->deleteme) {
+ // We don't want a watch that is marked "deleteme" to re-arm itself.
+ rearmType = rearm::REMOVE;
+ }
+
+ // rearmType = loop.process??;
+
+ if (rearmType == rearm::REMOVE) {
+ loop.getBaseLock().unlock();
+ this->watch_removed();
+ loop.getBaseLock().lock();
+ }
+ }
+ }
};
template <typename EventLoop>
class timer : private BaseTimerWatcher<typename EventLoop::mutex_t>
{
- private:
- clock_type clock;
+ template <typename, typename> friend class timer_impl;
+ using base_t = BaseTimerWatcher<typename EventLoop::mutex_t>;
public:
void arm_timer(EventLoop &eloop, struct timespec &timeout) noexcept
{
- eloop.setTimer(this, timeout, clock);
+ eloop.setTimer(this, timeout, base_t::clock);
}
void arm_timer(EventLoop &eloop, struct timespec &timeout, struct timespec &interval) noexcept
{
- eloop.setTimer(this, timeout, interval, clock);
+ eloop.setTimer(this, timeout, interval, base_t::clock);
}
// Arm timer, relative to now:
void arm_timer_rel(EventLoop &eloop, struct timespec &timeout) noexcept
{
- eloop.setTimerRel(this, timeout, clock);
+ eloop.setTimerRel(this, timeout, base_t::clock);
}
void arm_timer_rel(EventLoop &eloop, struct timespec &timeout, struct timespec &interval) noexcept
{
- eloop.setTimerRel(this, timeout, interval, clock);
+ eloop.setTimerRel(this, timeout, interval, base_t::clock);
}
+ void stop_timer(EventLoop &eloop) noexcept
+ {
+ eloop.stop_timer(this, base_t::clock);
+ }
+
void deregister(EventLoop &eloop) noexcept
{
- eloop.deregister(this, clock);
+ eloop.deregister(this, this->clock);
}
// Timer expired, and the given number of intervals have elapsed before
- // expiry evenet was queued. Normally intervals == 1 to indicate no
+ // expiry event was queued. Normally intervals == 1 to indicate no
// overrun.
- virtual rearm timer_expiry(EventLoop &eloop, int intervals) = 0;
+ // virtual rearm timer_expiry(EventLoop &eloop, int intervals) = 0;
+};
+
+template <typename EventLoop, typename Derived>
+class timer_impl : public timer<EventLoop>
+{
+ void dispatch(void *loop_ptr) noexcept override
+ {
+ EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
+ loop.getBaseLock().unlock();
+
+ auto rearmType = static_cast<Derived *>(this)->timer_expiry(loop, this->intervals);
+
+ loop.getBaseLock().lock();
+
+ if (rearmType != rearm::REMOVED) {
+
+ this->active = false;
+ if (this->deleteme) {
+ // We don't want a watch that is marked "deleteme" to re-arm itself.
+ rearmType = rearm::REMOVE;
+ }
+
+ loop.processTimerRearm(this, rearmType);
+
+ if (rearmType == rearm::REMOVE) {
+ loop.getBaseLock().unlock();
+ this->watch_removed();
+ loop.getBaseLock().lock();
+ }
+ }
+ }
};
} // namespace dasynq::dprivate
static ServiceSet *service_set = nullptr; // Reference to service set
namespace {
-class BufferedLogStream : public EventLoop_t::fd_watcher
+class BufferedLogStream : public EventLoop_t::fd_watcher_impl<BufferedLogStream>
{
private:
release = false;
}
- rearm fd_event(EventLoop_t &loop, int fd, int flags) noexcept override;
+ rearm fd_event(EventLoop_t &loop, int fd, int flags) noexcept;
// Check whether the console can be released.
void flushForRelease();
using namespace dasynq;
-using EventLoop_t = event_loop<NullMutex>;
+using EventLoop_t = event_loop<null_mutex>;
EventLoop_t eventLoop = EventLoop_t();
void setup_external_log() noexcept;
-class ControlSocketWatcher : public EventLoop_t::fd_watcher
+class ControlSocketWatcher : public EventLoop_t::fd_watcher_impl<ControlSocketWatcher>
{
- rearm fd_event(EventLoop_t &loop, int fd, int flags) override
+ public:
+ rearm fd_event(EventLoop_t &loop, int fd, int flags) noexcept
{
control_socket_cb(&loop, fd);
return rearm::REARM;
namespace {
- class CallbackSignalHandler : public EventLoop_t::signal_watcher
+ class CallbackSignalHandler : public EventLoop_t::signal_watcher_impl<CallbackSignalHandler>
{
public:
typedef void (*cb_func_t)(EventLoop_t *);
this->cb_func = cb_func;
}
- rearm received(EventLoop_t &eloop, int signo, SigInfo_p siginfo) override
+ rearm received(EventLoop_t &eloop, int signo, siginfo_p siginfo)
{
service_set->stop_all_services(ShutdownType::REBOOT);
return rearm::REARM;
}
};
- class ControlSocketWatcher : public EventLoop_t::fd_watcher
+ class ControlSocketWatcher : public EventLoop_t::fd_watcher_impl<ControlSocketWatcher>
{
- rearm fd_event(EventLoop_t &loop, int fd, int flags) override
+ rearm fd_event(EventLoop_t &loop, int fd, int flags)
{
control_socket_cb(&loop, fd);
return rearm::REARM;
sigquit_watcher.setCbFunc(sigterm_cb);
}
- auto sigterm_watcher = CallbackSignalHandler(sigterm_cb);
+ CallbackSignalHandler sigterm_watcher {sigterm_cb};
sigint_watcher.add_watch(eventLoop, SIGINT);
sigquit_watcher.add_watch(eventLoop, SIGQUIT);
return r;
}
-class ServiceChildWatcher : public EventLoop_t::child_proc_watcher
+class ServiceChildWatcher : public EventLoop_t::child_proc_watcher_impl<ServiceChildWatcher>
{
public:
ServiceRecord * service;
- rearm child_status(EventLoop_t &eloop, pid_t child, int status) noexcept override;
+ rearm child_status(EventLoop_t &eloop, pid_t child, int status) noexcept;
ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { }
};
-class ServiceIoWatcher : public EventLoop_t::fd_watcher
+class ServiceIoWatcher : public EventLoop_t::fd_watcher_impl<ServiceIoWatcher>
{
public:
ServiceRecord * service;