Upgrade bundled Dasynq to 1.0.1.
authorDavin McCall <davmac@davmac.org>
Mon, 18 Dec 2017 09:28:56 +0000 (09:28 +0000)
committerDavin McCall <davmac@davmac.org>
Mon, 18 Dec 2017 16:23:49 +0000 (16:23 +0000)
src/dasynq/dasynq-basewatchers.h
src/dasynq/dasynq-naryheap.h
src/dasynq/dasynq-stableheap.h [new file with mode: 0644]
src/dasynq/dasynq-svec.h [new file with mode: 0644]
src/dasynq/dasynq-timerbase.h
src/dasynq/dasynq-timerfd.h
src/dasynq/dasynq.h
src/dinit.cc

index cbabe910703c45a1888f0ea35165c17dab385814..2dd0a1d05e4aad5972aaccf74602fd63402c103a 100644 (file)
@@ -14,12 +14,12 @@ namespace dprivate {
     // specialised to call one or the other depending on the mutex type:
     template <typename T_Mutex> void sigmaskf(int how, const sigset_t *set, sigset_t *oset)
     {
-        sigprocmask(how, set, oset);
+        pthread_sigmask(how, set, oset);
     }
 
     template <> inline void sigmaskf<null_mutex>(int how, const sigset_t *set, sigset_t *oset)
     {
-        pthread_sigmask(how, set, oset);
+        sigprocmask(how, set, oset);
     }
 }
 
@@ -51,7 +51,8 @@ namespace dprivate {
     // (non-public API)
 
     class base_watcher;
-    using prio_queue = NaryHeap<dprivate::base_watcher *, int>;
+    template <typename A, typename B, typename C> using nary_heap_def = nary_heap<A,B,C>;
+    using prio_queue = stable_heap<nary_heap_def, dprivate::base_watcher *, int>;
 
     template <typename T_Loop> class fd_watcher;
     template <typename T_Loop> class bidi_fd_watcher;
index 7713b817162a950c99675920886a7bbf209bcb24..df3a4db107802fa511baac6f542b2ba9d66abe61 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef DASYNC_NARYHEAP_H_INCLUDED
 #define DASYNC_NARYHEAP_H_INCLUDED
 
-#include <vector>
+#include "dasynq-svec.h"
 #include <type_traits>
 #include <functional>
 #include <limits>
@@ -28,7 +28,7 @@ namespace dasynq {
  * Compare : functional object type to compare priorities
  */
 template <typename T, typename P, typename Compare = std::less<P>, int N = 16>
-class NaryHeap
+class nary_heap
 {
     public:
     struct handle_t;
@@ -51,7 +51,7 @@ class NaryHeap
         HeapNode() { }
     };
 
-    std::vector<HeapNode> hvec;
+    svector<HeapNode> hvec;
 
     using hindex_t = typename decltype(hvec)::size_type;
 
@@ -65,7 +65,14 @@ class NaryHeap
     // separate container, and have the handle be an index into that container).
     struct handle_t
     {
-        T hd;
+        union hd_u_t {
+            // The data member is kept in a union so it doesn't get constructed/destructed
+            // automatically, and we can construct it lazily.
+            public:
+            hd_u_t() { }
+            ~hd_u_t() { }
+            T hd;
+        } hd_u;
         hindex_t heap_index;
     };
 
@@ -225,21 +232,15 @@ class NaryHeap
 
     void remove_h(hindex_t hidx)
     {
-        // bvec[hvec[hidx].data_index].heap_index = -1;
         hvec[hidx].hnd_p->heap_index = -1;
         if (hvec.size() != hidx + 1) {
-            // replace the first element with the last:
-            // bvec[hvec.back().data_index].heap_index = hidx;
-
-            //hvec.back().hnd_p->heap_index = hidx;
-            //hvec[hidx] = hvec.back();
-            //hvec.pop_back();
+            // replace the removed element with the last:
+            hvec[hidx] = hvec.back();
+            hvec[hidx].hnd_p->heap_index = hidx;
+            hvec.pop_back();
 
             // Now bubble up:
-            //bubble_up(hidx);
-
-            bubble_up(hidx, hvec.size() - 2, *(hvec.back().hnd_p), hvec.back().data);
-            hvec.pop_back();
+            bubble_up(hidx);
         }
         else {
             hvec.pop_back();
@@ -250,14 +251,14 @@ class NaryHeap
 
     T & node_data(handle_t & index) noexcept
     {
-        return index.hd;
+        return index.hd_u.hd;
     }
 
     // Allocate a slot, but do not incorporate into the heap:
     //  u... : parameters for data constructor T::T(...)
     template <typename ...U> void allocate(handle_t & hnd, U... u)
     {
-        new (& hnd.hd) T(u...);
+        new (& hnd.hd_u.hd) T(u...);
         hnd.heap_index = -1;
         hindex_t max_allowed = hvec.max_size();
 
@@ -287,6 +288,7 @@ class NaryHeap
     void deallocate(handle_t & index) noexcept
     {
         num_nodes--;
+        index.hd_u.~hd_u_t();
 
         // shrink the capacity of hvec if num_nodes is sufficiently less than its current capacity. Why
         // capacity/4? Because in general, capacity must be at least doubled when it is exceeded to get
@@ -295,7 +297,7 @@ class NaryHeap
         // repeatedly as nodes get added and removed.
 
         if (num_nodes < hvec.capacity() / 4) {
-            hvec.shrink_to_fit();
+            hvec.shrink_to(num_nodes * 2);
         }
     }
 
@@ -337,7 +339,7 @@ class NaryHeap
         return hvec.empty();
     }
 
-    bool is_queued(handle_t hnd)
+    bool is_queued(handle_t hnd)
     {
         return hnd.heap_index != (hindex_t) -1;
     }
@@ -360,6 +362,13 @@ class NaryHeap
             return bubble_down(heap_index);
         }
     }
+
+    nary_heap()
+    {
+        // Nothing required
+    }
+
+    nary_heap(const nary_heap &) = delete;
 };
 
 }
diff --git a/src/dasynq/dasynq-stableheap.h b/src/dasynq/dasynq-stableheap.h
new file mode 100644 (file)
index 0000000..a93ae79
--- /dev/null
@@ -0,0 +1,122 @@
+#ifndef DASYNQ_STABLE_HEAP_INCLUDED
+#define DASYNQ_STABLE_HEAP_INCLUDED 1
+
+// Convert an "unstable" priority queue (which doesn't use FIFO ordering for same-priority elements)
+// into a "stable" queue (which does deliver same-priority elements in FIFO order). This is done by
+// adding a generation counter to each element added to the queue, and using it as a second-order
+// priority key (after the original key).
+//
+// The generation counter is a 64-bit integer and can not realistically overflow.
+
+#include <functional>
+
+namespace dasynq {
+
+template <typename P>
+class stable_prio
+{
+    public:
+    P p;
+    uint64_t order;
+    
+    template <typename ...U>
+    stable_prio(uint64_t o, U... u) : p(u...), order(o)
+    {
+    }
+    
+    // zero-argument constructor should not really be needed, but some
+    // heap implementations aren't yet perfect.
+    stable_prio()
+    {
+    }
+};
+
+template <typename P, typename C>
+class compare_stable_prio
+{
+    public:
+    bool operator()(const stable_prio<P> &a, const stable_prio<P> &b)
+    {
+        C lt;
+        if (lt(a.p, b.p)) {
+            return true;
+        }
+        if (lt(b.p, a.p)) {
+            return false;
+        }
+        
+        return a.order < b.order;
+    }
+};
+
+
+template <template <typename H1, typename H2, typename H3> class H, typename T, typename P, typename C = std::less<P>>
+class stable_heap : private H<T,stable_prio<P>,compare_stable_prio<P,C>>
+{
+    using Base = H<T,stable_prio<P>,compare_stable_prio<P,C>>;
+    
+    // using H<T,P,compare_stable_prio<P,C>>:H;  // inherit constructors
+    using Base::Base;
+    
+    uint64_t sequence = 0;
+    
+    public:
+    
+    using handle_t = typename Base::handle_t;
+    using handle_t_r = typename Base::handle_t_r;
+    
+    bool insert(handle_t & index, P pval = P())
+    {
+        auto sp = stable_prio<P>(sequence++, pval);
+        return Base::insert(index, sp);
+    }
+
+    template <typename ...U> void allocate(handle_t & hnd, U&& ...u)
+    {
+        return Base::allocate(hnd, u...);
+    }
+
+    static void init_handle(handle_t &hndl)
+    {
+        Base::init_handle(hndl);
+    }
+
+    T &node_data(handle_t &hndl)
+    {
+        return Base::node_data(hndl);
+    }
+
+    bool is_queued(handle_t & hnd)
+    {
+        return Base::is_queued(hnd);
+    }
+
+    decltype(std::declval<Base>().get_root()) get_root()
+    {
+        return Base::get_root();
+    }
+    
+    void pull_root()
+    {
+        Base::pull_root();
+    }
+    
+    void deallocate(handle_t_r index)
+    {
+        Base::deallocate(index);
+    }
+    
+    void remove(handle_t_r hnd)
+    {
+        Base::remove(hnd);
+    }
+    
+    bool empty()
+    {
+        return Base::empty();
+    }
+};
+
+} // namespace dasynq
+
+#endif
diff --git a/src/dasynq/dasynq-svec.h b/src/dasynq/dasynq-svec.h
new file mode 100644 (file)
index 0000000..d32190e
--- /dev/null
@@ -0,0 +1,184 @@
+#ifndef DASYNQ_SVEC_H_INCLUDED
+#define DASYNQ_SVEC_H_INCLUDED
+
+#include <limits>
+#include <utility>
+#include <new>
+
+// Vector with possibility to shrink capacity arbitrarily
+
+namespace dasynq {
+
+template <typename T>
+class svector
+{
+    private:
+    T * array;
+    size_t size_v;
+    size_t capacity_v;
+
+    void check_capacity()
+    {
+        if (size_v == capacity_v) {
+            // double capacity now:
+            if (capacity_v == 0) capacity_v = 1;
+            T * new_array = new T[capacity_v * 2];
+            for (size_t i = 0; i < size_v; i++) {
+                new (&new_array[i]) T(std::move(array[i]));
+                array[i].T::~T();
+            }
+            delete[] array;
+            array = new_array;
+            capacity_v *= 2;
+        }
+    }
+
+    public:
+    using size_type = size_t;
+
+    svector() : array(nullptr), size_v(0), capacity_v(0)
+    {
+
+    }
+
+    svector(const svector<T> &other)
+    {
+        capacity_v = other.size_v;
+        size_v = other.size_v;
+        array = new T[capacity_v];
+        for (size_t i = 0; i < size_v; i++) {
+            new (&array[i]) T(other[i]);
+        }
+    }
+
+    ~svector()
+    {
+        for (size_t i = 0; i < size_v; i++) {
+            array[i].T::~T();
+        }
+        delete[] array;
+    }
+
+    void push_back(const T &t)
+    {
+        check_capacity();
+        new (&array[size_v]) T(t);
+        size_v++;
+    }
+
+    void push_back(T &&t)
+    {
+        check_capacity();
+        new (&array[size_v]) T(t);
+        size_v++;
+    }
+
+    template <typename ...U>
+    void emplace_back(U... args)
+    {
+        check_capacity();
+        new (&array[size_v]) T(args...);
+        size_v++;
+    }
+
+    void pop_back()
+    {
+        size_v--;
+    }
+
+    T &operator[](size_t index)
+    {
+        return array[index];
+    }
+
+    const T &operator[](size_t index) const
+    {
+        return array[index];
+    }
+
+    size_t size() const
+    {
+        return size_v;
+    }
+
+    size_t capacity() const
+    {
+        return capacity_v;
+    }
+
+    bool empty() const
+    {
+        return size_v == 0;
+    }
+
+    static size_t max_size() noexcept
+    {
+        return std::numeric_limits<size_type>::max() / sizeof(T);
+
+        // if we were to support allocators:
+        //size_t max = std::allocator_traits<std::allocator<char>>::max_size(std::allocator<T>());
+        //return max / sizeof(T);
+        //  (but not / sizeof(T) for C++17 apparently)
+    }
+
+    void reserve(size_t amount)
+    {
+        if (capacity_v < amount) {
+            T * new_array = new T[amount];
+            for (size_t i = 0; i < size_v; i++) {
+                new (&new_array[i]) T(std::move(array[i]));
+                array[i].T::~T();
+            }
+            delete[] array;
+            array = new_array;
+            capacity_v = amount;
+        }
+    }
+
+    void shrink_to(size_t amount)
+    {
+        if (capacity_v > amount) {
+            T * new_array = new(std::nothrow) T[amount];
+            if (new_array == nullptr) {
+                return;
+            }
+            for (size_t i = 0; i < size_v; i++) {
+                new (&new_array[i]) T(std::move(array[i]));
+                array[i].T::~T();
+            }
+            delete[] array;
+            array = new_array;
+            capacity_v = amount;
+        }
+    }
+
+    T &back()
+    {
+        return array[size_v - 1];
+    }
+
+    T* begin()
+    {
+        return array;
+    }
+
+    const T *begin() const
+    {
+        return array;
+    }
+
+    T* end()
+    {
+        return array + size_v;
+    }
+
+    const T *end() const
+    {
+        return array + size_v;
+    }
+};
+
+
+} // namespace
+
+#endif
index 3e013dca9d89143e9d426ab99f8a9431cdb76095..e196e4111767006af8c43b3581e2e814676cb844 100644 (file)
@@ -208,7 +208,7 @@ class compare_timespec
     }
 };
 
-using timer_queue_t = NaryHeap<timer_data, time_val, compare_timespec>;
+using timer_queue_t = nary_heap<timer_data, time_val, compare_timespec>;
 using timer_handle_t = timer_queue_t::handle_t;
 
 static inline void init_timer_handle(timer_handle_t &hnd) noexcept
index 025bbba8b9493c3f8ecbe0b1e60a89f379374a3d..c3b465d4c886b57e825f093f5b43bfcfaa2735da 100644 (file)
@@ -141,7 +141,7 @@ template <class Base> class timer_fd_events : public timer_base<Base>
 
     void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
     {
-        timer_queue_t & queue = this->queue_for_clock(clock);
+        timer_queue_t &queue = this->queue_for_clock(clock);
         int fd = (clock == clock_type::MONOTONIC) ? timerfd_fd : systemtime_fd;
         if (queue.is_queued(timer_id)) {
             bool was_first = (&queue.get_root()) == &timer_id;
@@ -159,7 +159,7 @@ template <class Base> class timer_fd_events : public timer_base<Base>
     {
         timespec timeout = timeouttv;
         timespec interval = intervaltv;
-        timer_queue_t queue = this->queue_for_clock(clock);
+        timer_queue_t &queue = this->queue_for_clock(clock);
 
         switch (clock) {
         case clock_type::SYSTEM:
index c471fa4546e3037dd978bca3e292b5ba34adfeae..d9038f97f84c2f66da833e241c5ca8ac3c36336f 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "dasynq-flags.h"
 #include "dasynq-naryheap.h"
+#include "dasynq-stableheap.h"
 #include "dasynq-interrupt.h"
 #include "dasynq-util.h"
 
@@ -450,6 +451,9 @@ namespace dprivate {
                 lock.unlock();
             }
         }
+
+        event_dispatch() {  }
+        event_dispatch(const event_dispatch &) = delete;
     };
 }
 
@@ -1158,13 +1162,14 @@ class event_loop
         auto & ed = (event_dispatch<T_Mutex, backend_traits_t> &) loop_mech;
         ed.lock.lock();
         
-        // So this pulls *all* currently pending events and processes them in the current thread.
-        // That's probably good for throughput, but maybe the behaviour should be configurable.
+        if (limit == 0) {
+            return false;
+        }
         
         base_watcher * pqueue = ed.pull_event();
         bool active = false;
         
-        while (pqueue != nullptr && limit != 0) {
+        while (pqueue != nullptr) {
         
             pqueue->active = true;
             active = true;
@@ -1193,8 +1198,11 @@ class event_loop
             }
 
             pqueue->dispatch(this);
+            if (limit > 0) {
+                limit--;
+                if (limit == 0) break;
+            }
             pqueue = ed.pull_event();
-            if (limit > 0) limit--;
         }
         
         ed.lock.unlock();
@@ -1259,6 +1267,9 @@ class event_loop
     {
         loop_mech.get_time(tv, clock, force_update);
     }
+
+    event_loop() { }
+    event_loop(const event_loop &other) = delete;
 };
 
 typedef event_loop<null_mutex> event_loop_n;
index 696271f3c7fca624d2879c01d1b81ee3f7c8c048..86e67dbdac2bfe9dac071904ecfc848c3bf7cd36 100644 (file)
@@ -40,7 +40,7 @@
 using namespace dasynq;
 using eventloop_t = event_loop<null_mutex>;
 
-eventloop_t eventLoop = eventloop_t();
+eventloop_t eventLoop;
 
 static void sigint_reboot_cb(eventloop_t &eloop) noexcept;
 static void sigquit_cb(eventloop_t &eloop) noexcept;