// specialised to call one or the other depending on the mutex type:
template <typename T_Mutex> void sigmaskf(int how, const sigset_t *set, sigset_t *oset)
{
- sigprocmask(how, set, oset);
+ pthread_sigmask(how, set, oset);
}
template <> inline void sigmaskf<null_mutex>(int how, const sigset_t *set, sigset_t *oset)
{
- pthread_sigmask(how, set, oset);
+ sigprocmask(how, set, oset);
}
}
// (non-public API)
class base_watcher;
- using prio_queue = NaryHeap<dprivate::base_watcher *, int>;
+ template <typename A, typename B, typename C> using nary_heap_def = nary_heap<A,B,C>;
+ using prio_queue = stable_heap<nary_heap_def, dprivate::base_watcher *, int>;
template <typename T_Loop> class fd_watcher;
template <typename T_Loop> class bidi_fd_watcher;
#ifndef DASYNC_NARYHEAP_H_INCLUDED
#define DASYNC_NARYHEAP_H_INCLUDED
-#include <vector>
+#include "dasynq-svec.h"
#include <type_traits>
#include <functional>
#include <limits>
* Compare : functional object type to compare priorities
*/
template <typename T, typename P, typename Compare = std::less<P>, int N = 16>
-class NaryHeap
+class nary_heap
{
public:
struct handle_t;
HeapNode() { }
};
- std::vector<HeapNode> hvec;
+ svector<HeapNode> hvec;
using hindex_t = typename decltype(hvec)::size_type;
// separate container, and have the handle be an index into that container).
struct handle_t
{
- T hd;
+ union hd_u_t {
+ // The data member is kept in a union so it doesn't get constructed/destructed
+ // automatically, and we can construct it lazily.
+ public:
+ hd_u_t() { }
+ ~hd_u_t() { }
+ T hd;
+ } hd_u;
hindex_t heap_index;
};
void remove_h(hindex_t hidx)
{
- // bvec[hvec[hidx].data_index].heap_index = -1;
hvec[hidx].hnd_p->heap_index = -1;
if (hvec.size() != hidx + 1) {
- // replace the first element with the last:
- // bvec[hvec.back().data_index].heap_index = hidx;
-
- //hvec.back().hnd_p->heap_index = hidx;
- //hvec[hidx] = hvec.back();
- //hvec.pop_back();
+ // replace the removed element with the last:
+ hvec[hidx] = hvec.back();
+ hvec[hidx].hnd_p->heap_index = hidx;
+ hvec.pop_back();
// Now bubble up:
- //bubble_up(hidx);
-
- bubble_up(hidx, hvec.size() - 2, *(hvec.back().hnd_p), hvec.back().data);
- hvec.pop_back();
+ bubble_up(hidx);
}
else {
hvec.pop_back();
T & node_data(handle_t & index) noexcept
{
- return index.hd;
+ return index.hd_u.hd;
}
// Allocate a slot, but do not incorporate into the heap:
// u... : parameters for data constructor T::T(...)
template <typename ...U> void allocate(handle_t & hnd, U... u)
{
- new (& hnd.hd) T(u...);
+ new (& hnd.hd_u.hd) T(u...);
hnd.heap_index = -1;
hindex_t max_allowed = hvec.max_size();
void deallocate(handle_t & index) noexcept
{
num_nodes--;
+ index.hd_u.~hd_u_t();
// shrink the capacity of hvec if num_nodes is sufficiently less than its current capacity. Why
// capacity/4? Because in general, capacity must be at least doubled when it is exceeded to get
// repeatedly as nodes get added and removed.
if (num_nodes < hvec.capacity() / 4) {
- hvec.shrink_to_fit();
+ hvec.shrink_to(num_nodes * 2);
}
}
return hvec.empty();
}
- bool is_queued(handle_t hnd)
+ bool is_queued(handle_t & hnd)
{
return hnd.heap_index != (hindex_t) -1;
}
return bubble_down(heap_index);
}
}
+
+ nary_heap()
+ {
+ // Nothing required
+ }
+
+ nary_heap(const nary_heap &) = delete;
};
}
--- /dev/null
+#ifndef DASYNQ_STABLE_HEAP_INCLUDED
+#define DASYNQ_STABLE_HEAP_INCLUDED 1
+
+// Convert an "unstable" priority queue (which doesn't use FIFO ordering for same-priority elements)
+// into a "stable" queue (which does deliver same-priority elements in FIFO order). This is done by
+// adding a generation counter to each element added to the queue, and using it as a second-order
+// priority key (after the original key).
+//
+// The generation counter is a 64-bit integer and can not realistically overflow.
+
+#include <functional>
+
+namespace dasynq {
+
+template <typename P>
+class stable_prio
+{
+ public:
+ P p;
+ uint64_t order;
+
+ template <typename ...U>
+ stable_prio(uint64_t o, U... u) : p(u...), order(o)
+ {
+ }
+
+ // zero-argument constructor should not really be needed, but some
+ // heap implementations aren't yet perfect.
+ stable_prio()
+ {
+ }
+};
+
+template <typename P, typename C>
+class compare_stable_prio
+{
+ public:
+ bool operator()(const stable_prio<P> &a, const stable_prio<P> &b)
+ {
+ C lt;
+ if (lt(a.p, b.p)) {
+ return true;
+ }
+ if (lt(b.p, a.p)) {
+ return false;
+ }
+
+ return a.order < b.order;
+ }
+};
+
+
+template <template <typename H1, typename H2, typename H3> class H, typename T, typename P, typename C = std::less<P>>
+class stable_heap : private H<T,stable_prio<P>,compare_stable_prio<P,C>>
+{
+ using Base = H<T,stable_prio<P>,compare_stable_prio<P,C>>;
+
+ // using H<T,P,compare_stable_prio<P,C>>:H; // inherit constructors
+ using Base::Base;
+
+ uint64_t sequence = 0;
+
+ public:
+
+ using handle_t = typename Base::handle_t;
+ using handle_t_r = typename Base::handle_t_r;
+
+ bool insert(handle_t & index, P pval = P())
+ {
+ auto sp = stable_prio<P>(sequence++, pval);
+ return Base::insert(index, sp);
+ }
+
+ template <typename ...U> void allocate(handle_t & hnd, U&& ...u)
+ {
+ return Base::allocate(hnd, u...);
+ }
+
+ static void init_handle(handle_t &hndl)
+ {
+ Base::init_handle(hndl);
+ }
+
+ T &node_data(handle_t &hndl)
+ {
+ return Base::node_data(hndl);
+ }
+
+ bool is_queued(handle_t & hnd)
+ {
+ return Base::is_queued(hnd);
+ }
+
+ decltype(std::declval<Base>().get_root()) get_root()
+ {
+ return Base::get_root();
+ }
+
+ void pull_root()
+ {
+ Base::pull_root();
+ }
+
+ void deallocate(handle_t_r index)
+ {
+ Base::deallocate(index);
+ }
+
+ void remove(handle_t_r hnd)
+ {
+ Base::remove(hnd);
+ }
+
+ bool empty()
+ {
+ return Base::empty();
+ }
+};
+
+} // namespace dasynq
+
+#endif
--- /dev/null
+#ifndef DASYNQ_SVEC_H_INCLUDED
+#define DASYNQ_SVEC_H_INCLUDED
+
+#include <limits>
+#include <utility>
+#include <new>
+
+// Vector with possibility to shrink capacity arbitrarily
+
+namespace dasynq {
+
+template <typename T>
+class svector
+{
+ private:
+ T * array;
+ size_t size_v;
+ size_t capacity_v;
+
+ void check_capacity()
+ {
+ if (size_v == capacity_v) {
+ // double capacity now:
+ if (capacity_v == 0) capacity_v = 1;
+ T * new_array = new T[capacity_v * 2];
+ for (size_t i = 0; i < size_v; i++) {
+ new (&new_array[i]) T(std::move(array[i]));
+ array[i].T::~T();
+ }
+ delete[] array;
+ array = new_array;
+ capacity_v *= 2;
+ }
+ }
+
+ public:
+ using size_type = size_t;
+
+ svector() : array(nullptr), size_v(0), capacity_v(0)
+ {
+
+ }
+
+ svector(const svector<T> &other)
+ {
+ capacity_v = other.size_v;
+ size_v = other.size_v;
+ array = new T[capacity_v];
+ for (size_t i = 0; i < size_v; i++) {
+ new (&array[i]) T(other[i]);
+ }
+ }
+
+ ~svector()
+ {
+ for (size_t i = 0; i < size_v; i++) {
+ array[i].T::~T();
+ }
+ delete[] array;
+ }
+
+ void push_back(const T &t)
+ {
+ check_capacity();
+ new (&array[size_v]) T(t);
+ size_v++;
+ }
+
+ void push_back(T &&t)
+ {
+ check_capacity();
+ new (&array[size_v]) T(t);
+ size_v++;
+ }
+
+ template <typename ...U>
+ void emplace_back(U... args)
+ {
+ check_capacity();
+ new (&array[size_v]) T(args...);
+ size_v++;
+ }
+
+ void pop_back()
+ {
+ size_v--;
+ }
+
+ T &operator[](size_t index)
+ {
+ return array[index];
+ }
+
+ const T &operator[](size_t index) const
+ {
+ return array[index];
+ }
+
+ size_t size() const
+ {
+ return size_v;
+ }
+
+ size_t capacity() const
+ {
+ return capacity_v;
+ }
+
+ bool empty() const
+ {
+ return size_v == 0;
+ }
+
+ static size_t max_size() noexcept
+ {
+ return std::numeric_limits<size_type>::max() / sizeof(T);
+
+ // if we were to support allocators:
+ //size_t max = std::allocator_traits<std::allocator<char>>::max_size(std::allocator<T>());
+ //return max / sizeof(T);
+ // (but not / sizeof(T) for C++17 apparently)
+ }
+
+ void reserve(size_t amount)
+ {
+ if (capacity_v < amount) {
+ T * new_array = new T[amount];
+ for (size_t i = 0; i < size_v; i++) {
+ new (&new_array[i]) T(std::move(array[i]));
+ array[i].T::~T();
+ }
+ delete[] array;
+ array = new_array;
+ capacity_v = amount;
+ }
+ }
+
+ void shrink_to(size_t amount)
+ {
+ if (capacity_v > amount) {
+ T * new_array = new(std::nothrow) T[amount];
+ if (new_array == nullptr) {
+ return;
+ }
+ for (size_t i = 0; i < size_v; i++) {
+ new (&new_array[i]) T(std::move(array[i]));
+ array[i].T::~T();
+ }
+ delete[] array;
+ array = new_array;
+ capacity_v = amount;
+ }
+ }
+
+ T &back()
+ {
+ return array[size_v - 1];
+ }
+
+ T* begin()
+ {
+ return array;
+ }
+
+ const T *begin() const
+ {
+ return array;
+ }
+
+ T* end()
+ {
+ return array + size_v;
+ }
+
+ const T *end() const
+ {
+ return array + size_v;
+ }
+};
+
+
+} // namespace
+
+#endif
}
};
-using timer_queue_t = NaryHeap<timer_data, time_val, compare_timespec>;
+using timer_queue_t = nary_heap<timer_data, time_val, compare_timespec>;
using timer_handle_t = timer_queue_t::handle_t;
static inline void init_timer_handle(timer_handle_t &hnd) noexcept
void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
{
- timer_queue_t & queue = this->queue_for_clock(clock);
+ timer_queue_t &queue = this->queue_for_clock(clock);
int fd = (clock == clock_type::MONOTONIC) ? timerfd_fd : systemtime_fd;
if (queue.is_queued(timer_id)) {
bool was_first = (&queue.get_root()) == &timer_id;
{
timespec timeout = timeouttv;
timespec interval = intervaltv;
- timer_queue_t queue = this->queue_for_clock(clock);
+ timer_queue_t &queue = this->queue_for_clock(clock);
switch (clock) {
case clock_type::SYSTEM:
#include "dasynq-flags.h"
#include "dasynq-naryheap.h"
+#include "dasynq-stableheap.h"
#include "dasynq-interrupt.h"
#include "dasynq-util.h"
lock.unlock();
}
}
+
+ event_dispatch() { }
+ event_dispatch(const event_dispatch &) = delete;
};
}
auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
ed.lock.lock();
- // So this pulls *all* currently pending events and processes them in the current thread.
- // That's probably good for throughput, but maybe the behaviour should be configurable.
+ if (limit == 0) {
+ return false;
+ }
base_watcher * pqueue = ed.pull_event();
bool active = false;
- while (pqueue != nullptr && limit != 0) {
+ while (pqueue != nullptr) {
pqueue->active = true;
active = true;
}
pqueue->dispatch(this);
+ if (limit > 0) {
+ limit--;
+ if (limit == 0) break;
+ }
pqueue = ed.pull_event();
- if (limit > 0) limit--;
}
ed.lock.unlock();
{
loop_mech.get_time(tv, clock, force_update);
}
+
+ event_loop() { }
+ event_loop(const event_loop &other) = delete;
};
typedef event_loop<null_mutex> event_loop_n;
using namespace dasynq;
using eventloop_t = event_loop<null_mutex>;
-eventloop_t eventLoop = eventloop_t();
+eventloop_t eventLoop;
static void sigint_reboot_cb(eventloop_t &eloop) noexcept;
static void sigquit_cb(eventloop_t &eloop) noexcept;