Update dasynq library, and make appropriate changes in dinit code.
authorDavin McCall <davmac@davmac.org>
Sun, 12 Mar 2017 23:16:34 +0000 (23:16 +0000)
committerDavin McCall <davmac@davmac.org>
Sun, 12 Mar 2017 23:16:34 +0000 (23:16 +0000)
13 files changed:
src/control.h
src/dasynq/dasynq-binaryheap.h [new file with mode: 0644]
src/dasynq/dasynq-childproc.h
src/dasynq/dasynq-epoll.h
src/dasynq/dasynq-kqueue.h
src/dasynq/dasynq-mutex.h
src/dasynq/dasynq-svec.h [new file with mode: 0644]
src/dasynq/dasynq-timerfd.h [new file with mode: 0644]
src/dasynq/dasynq.h
src/dinit-log.cc
src/dinit.cc
src/service.cc
src/service.h

index 6bfd79053919b7827235358e647acab93a669ff6..9f296331ed79d826034518b6359deefed80327cf 100644 (file)
 // Control connection for dinit
 
 using namespace dasynq;
-using EventLoop_t = EventLoop<NullMutex>;
+using EventLoop_t = event_loop<NullMutex>;
 
 class ControlConn;
 class ControlConnWatcher;
 
 // forward-declaration of callback:
-static Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
+static rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
 
 // Pointer to the control connection that is listening for rollback completion
 extern ControlConn * rollback_handler_conn;
@@ -49,14 +49,14 @@ class ServiceRecord;
 
 class ControlConnWatcher : public EventLoop_t::BidiFdWatcher
 {
-    inline Rearm receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept;
+    inline rearm receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept;
 
-    Rearm readReady(EventLoop_t &loop, int fd) noexcept override
+    rearm readReady(EventLoop_t &loop, int fd) noexcept override
     {
         return receiveEvent(loop, fd, IN_EVENTS);
     }
     
-    Rearm writeReady(EventLoop_t &loop, int fd) noexcept override
+    rearm writeReady(EventLoop_t &loop, int fd) noexcept override
     {
         return receiveEvent(loop, fd, OUT_EVENTS);
     }
@@ -76,7 +76,7 @@ class ControlConnWatcher : public EventLoop_t::BidiFdWatcher
     }
 };
 
-inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept
+inline rearm ControlConnWatcher::receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept
 {
     return control_conn_cb(&loop, this, flags);
 }
@@ -84,7 +84,7 @@ inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t &loop, int fd, int fla
 
 class ControlConn : private ServiceListener
 {
-    friend Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
+    friend rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
     
     ControlConnWatcher iob;
     EventLoop_t *loop;
@@ -216,24 +216,24 @@ class ControlConn : private ServiceListener
 };
 
 
-static Rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
+static rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
 {
     char * cc_addr = (reinterpret_cast<char *>(watcher)) - offsetof(ControlConn, iob);
     ControlConn *conn = reinterpret_cast<ControlConn *>(cc_addr);
     if (revents & IN_EVENTS) {
         if (conn->dataReady()) {
             delete conn;
-            return Rearm::REMOVED;
+            return rearm::REMOVED;
         }
     }
     if (revents & OUT_EVENTS) {
         if (conn->sendData()) {
             delete conn;
-            return Rearm::REMOVED;
+            return rearm::REMOVED;
         }
     }
     
-    return Rearm::NOOP;
+    return rearm::NOOP;
 }
 
 #endif
diff --git a/src/dasynq/dasynq-binaryheap.h b/src/dasynq/dasynq-binaryheap.h
new file mode 100644 (file)
index 0000000..aafd53d
--- /dev/null
@@ -0,0 +1,256 @@
+#ifndef DASYNC_BINARYHEAP_H_INCLUDED
+#define DASYNC_BINARYHEAP_H_INCLUDED
+
+#include "dasynq-svec.h"
+#include <type_traits>
+#include <functional>
+#include <limits>
+
+namespace dasynq {
+
+/**
+ * Priority queue implementation based on a binary heap.
+ *
+ * Heap entry "handles" maintain an index into the heap. When the position of a node in the heap
+ * changes, its handle must be updated.
+ *
+ * T : node data type
+ * P : priority type (eg int)
+ * Compare : functional object type to compare priorities
+ */
+template <typename T, typename P, typename Compare = std::less<P>>
+class BinaryHeap
+{
+    public:
+    struct handle_t;
+    
+    private:
+    
+    // Actual heap node
+    class HeapNode
+    {
+        public:
+        P data;
+        handle_t * hnd_p;
+        
+        HeapNode(handle_t * hnd, const P &odata) : data(odata), hnd_p(hnd)
+        {
+            // nothing to do
+        }
+    };
+    
+    svector<HeapNode> hvec;
+    
+    using hindex_t = typename decltype(hvec)::size_type;
+
+    int root_node = -1;
+    hindex_t num_nodes = 0;
+    
+    public:
+    
+    // Handle to an element on the heap in the node buffer; also contains the data associated
+    // with the node. (Alternative implementation would be to store the heap data in a
+    // separate container, and have the handle be an index into that container).
+    struct handle_t
+    {
+        T hd;
+        hindex_t heap_index;
+    };
+    
+    // Initialise a handle (if it does not have a suitable constructor). Need not do anything
+    // but may store a sentinel value to mark the handle as inactive. It should not be
+    // necessary to call this, really.
+    static void init_handle(handle_t &h) noexcept
+    {
+    }
+    
+    private:
+    
+    // In hindsight, I probably could have used std::priority_queue rather than re-implementing a
+    // queue here. However, priority_queue doesn't expose the container (except as a protected
+    // member) and so would make it more complicated to reserve storage. It would also have been
+    // necessary to override assignment between elements to correctly update the reference in the
+    // handle.
+    
+    bool bubble_down()
+    {
+        return bubble_down(hvec.size() - 1);
+    }
+    
+    // Bubble a newly added timer down to the correct position
+    bool bubble_down(hindex_t pos)
+    {
+        // int pos = v.size() - 1;
+        Compare lt;
+        while (pos > 0) {
+            hindex_t parent = (pos - 1) / 2;
+            if (! lt(hvec[pos].data, hvec[parent].data)) {
+                break;
+            }
+            
+            std::swap(hvec[pos], hvec[parent]);
+            std::swap(hvec[pos].hnd_p->heap_index, hvec[parent].hnd_p->heap_index);
+            pos = parent;
+        }
+        
+        return pos == 0;
+    }
+    
+    void bubble_up(hindex_t pos = 0)
+    {
+        Compare lt;
+        hindex_t rmax = hvec.size();
+        hindex_t max = (rmax - 1) / 2;
+
+        while (pos <= max) {
+            hindex_t selchild;
+            hindex_t lchild = pos * 2 + 1;
+            hindex_t rchild = lchild + 1;
+            if (rchild >= rmax) {
+                selchild = lchild;
+            }
+            else {
+                // select the sooner of lchild and rchild
+                selchild = lt(hvec[lchild].data, hvec[rchild].data) ? lchild : rchild;
+            }
+            
+            if (! lt(hvec[selchild].data, hvec[pos].data)) {
+                break;
+            }
+            
+            std::swap(hvec[selchild].hnd_p->heap_index, hvec[pos].hnd_p->heap_index);
+            std::swap(hvec[selchild], hvec[pos]);
+            pos = selchild;
+        }
+    }
+
+    void remove_h(hindex_t hidx)
+    {
+        // bvec[hvec[hidx].data_index].heap_index = -1;
+        hvec[hidx].hnd_p->heap_index = -1;
+        if (hvec.size() != hidx + 1) {
+            // replace the first element with the last:
+            // bvec[hvec.back().data_index].heap_index = hidx;
+            hvec.back().hnd_p->heap_index = hidx;
+            hvec[hidx] = hvec.back();
+            hvec.pop_back();
+            
+            // Now bubble up:
+            bubble_up(hidx);
+        }
+        else {
+            hvec.pop_back();
+        }
+    }
+    
+    public:
+    
+    T & node_data(handle_t & index) noexcept
+    {
+        return index.hd;
+    }
+    
+    // Allocate a slot, but do not incorporate into the heap:
+    //  u... : parameters for data constructor T::T(...)
+    template <typename ...U> void allocate(handle_t & hnd, U... u)
+    {
+        new (& hnd.hd) T(u...);
+        hnd.heap_index = -1;
+        constexpr hindex_t max_allowed = std::numeric_limits<hindex_t>::is_signed ?
+                std::numeric_limits<hindex_t>::max() : ((hindex_t) - 2);
+        
+        if (num_nodes == max_allowed) {
+            throw std::bad_alloc();
+        }
+
+        num_nodes++;
+        
+        if (__builtin_expect(hvec.capacity() < num_nodes, 0)) {
+            hindex_t half_point = max_allowed / 2;
+            try {
+                if (__builtin_expect(num_nodes < half_point, 1)) {
+                    hvec.reserve(num_nodes * 2);
+                }
+                else {
+                    hvec.reserve(max_allowed);
+                }
+            }
+            catch (std::bad_alloc &e) {
+                hvec.reserve(num_nodes);
+            }
+        }
+    }
+    
+    // Deallocate a slot
+    void deallocate(handle_t & index) noexcept
+    {
+        num_nodes--;
+        
+        // shrink the capacity of hvec if num_nodes is sufficiently less than
+        // its current capacity:
+        if (num_nodes < hvec.capacity() / 4) {
+            hvec.shrink_to(num_nodes * 2);
+        }
+    }
+
+    bool insert(handle_t & hnd, P pval = P()) noexcept
+    {
+        hnd.heap_index = hvec.size();
+        hvec.emplace_back(&hnd, pval);
+        return bubble_down();
+    }
+    
+    // Get the root node handle. (Returns a handle_t or reference to handle_t).
+    handle_t & get_root()
+    {
+        return * hvec[0].hnd_p;
+    }
+    
+    P &get_root_priority()
+    {
+        return hvec[0].data;
+    }
+    
+    void pull_root()
+    {
+        remove_h(0);
+    }
+    
+    void remove(handle_t & hnd)
+    {
+        remove_h(hnd.heap_index);
+    }
+    
+    bool empty()
+    {
+        return hvec.empty();
+    }
+    
+    bool is_queued(handle_t hnd)
+    {
+        return hnd.heap_index != (hindex_t) -1;
+    }
+    
+    // Set a node priority. Returns true iff the node becomes the root node (and wasn't before).
+    bool set_priority(handle_t & hnd, const P& p)
+    {
+        int heap_index = hnd.heap_index;
+        
+        Compare lt;
+        if (lt(hvec[heap_index].data, p)) {
+            // Increase key
+            hvec[heap_index].data = p;
+            bubble_up(heap_index);
+            return false;
+        }
+        else {
+            // Decrease key
+            hvec[heap_index].data = p;
+            return bubble_down(heap_index);
+        }
+    }
+};
+
+}
+
+#endif
index af33e2619fc1575d150ae55fc0a05b6feb1eb9bb..722ebd8e6ba2edfcdf5eeb227a75204de5a4d098 100644 (file)
@@ -79,7 +79,7 @@ class pid_map
     }
 };
 
-namespace {
+inline namespace {
     void sigchld_handler(int signum)
     {
         // If SIGCHLD has no handler (is ignored), SIGCHLD signals will
@@ -94,11 +94,12 @@ template <class Base> class ChildProcEvents : public Base
 {
     private:
     pid_map child_waiters;
-
-    using SigInfo = typename Base::SigInfo;
     
     protected:
-    void receiveSignal(SigInfo &siginfo, void *userdata)
+    using SigInfo = typename Base::SigInfo;
+    
+    template <typename T>
+    bool receiveSignal(T & loop_mech, SigInfo &siginfo, void *userdata)
     {
         if (siginfo.get_signo() == SIGCHLD) {
             int status;
@@ -109,9 +110,10 @@ template <class Base> class ChildProcEvents : public Base
                     Base::receiveChildStat(child, status, ent.second);
                 }
             }
+            return false; // leave signal watch enabled
         }
         else {
-            Base::receiveSignal(siginfo, userdata);
+            return Base::receiveSignal(loop_mech, siginfo, userdata);
         }
     }
     
index 763c40a8473dd966d06c618fb0c65b4ec82188dc..e92deb645562d066f4df94f3d4edcd4b2e9fde80 100644 (file)
@@ -97,14 +97,12 @@ template <class Base> class EpollLoop : public Base
                 while (true) {
                     int r = read(sigfd, &siginfo.info, sizeof(siginfo.info));
                     if (r == -1) break;
-                    if (siginfo.get_signo() != SIGCHLD) {
-                        // TODO remove the special exception for SIGCHLD?
-                        sigdelset(&sigmask, siginfo.get_signo());
-                    }
                     auto iter = sigdataMap.find(siginfo.get_signo());
                     if (iter != sigdataMap.end()) {
                         void *userdata = (*iter).second;
-                        Base::receiveSignal(siginfo, userdata);
+                        if (Base::receiveSignal(*this, siginfo, userdata)) {
+                            sigdelset(&sigmask, siginfo.get_signo());
+                        }
                     }
                 }
                 signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
@@ -145,7 +143,7 @@ template <class Base> class EpollLoop : public Base
         }
     }
     
-    // flags:  IN_EVENTS | OUT_EVENTS
+    // flags:  IN_EVENTS | OUT_EVENTS | ONE_SHOT
     void addFdWatch(int fd, void *userdata, int flags, bool enabled = true)
     {
         struct epoll_event epevent;
index ad447080d349faf29015598bb408d36fc8bbcb89..4682f1976f7af5697313216f6f5b35792c69309c 100644 (file)
@@ -132,16 +132,13 @@ template <class Base> class KqueueLoop : public Base
                 timeout.tv_sec = 0;
                 timeout.tv_nsec = 0;
                 if (sigtimedwait(&sset, &siginfo.info, &timeout) > 0) {
-                    Base::receiveSignal(siginfo, (void *)events[i].udata);
-                }
-                
-                if (events[i].ident != SIGCHLD) {
-                    sigdelset(&sigmask, events[i].ident);
-                    events[i].flags = EV_DISABLE;
-                }
-                else {
-                    // TODO can we remove this SIGCHLD hack?
-                    events[i].flags = EV_ENABLE;
+                    if (Base::receiveSignal(*this, siginfo, (void *)events[i].udata)) {
+                        sigdelset(&sigmask, events[i].ident);
+                        events[i].flags = EV_DISABLE;
+                    }
+                    else {
+                        events[i].flags = EV_ENABLE;
+                    }
                 }
             }
             else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) {
@@ -348,7 +345,7 @@ template <class Base> class KqueueLoop : public Base
                     //      rather than disabling each individually
                     setFilterEnabled(EVFILT_SIGNAL, rsigno, false);
                 }
-                Base::receiveSignal(siginfo, sigdataMap[rsigno]);
+                Base::receiveSignal(*this, siginfo, sigdataMap[rsigno]);
                 rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout);
             }
         }
index 212e6e29887ee6366b9351401432c76b0554423a..9cf44b3f8aaa7242ab266dff2fb3f509de123a3d 100644 (file)
@@ -1,5 +1,5 @@
-#ifndef D_MUTEX_H_INCLUDED
-#define D_MUTEX_H_INCLUDED
+#ifndef DASYNQ_MUTEX_H_INCLUDED
+#define DASYNQ_MUTEX_H_INCLUDED
 
 //#include <pthread.h>
 #include <mutex>
@@ -40,7 +40,7 @@ using DMutex = std::mutex;
 // A "null" mutex, for which locking / unlocking actually does nothing.
 class NullMutex
 {
-    EMPTY_BODY
+    DASYNQ_EMPTY_BODY
     
     public:
     void lock() { }
diff --git a/src/dasynq/dasynq-svec.h b/src/dasynq/dasynq-svec.h
new file mode 100644 (file)
index 0000000..eb4259e
--- /dev/null
@@ -0,0 +1,163 @@
+#ifndef DASYNQ_SVEC_H_INCLUDED
+#define DASYNC_SVEC_H_INCLUDED
+
+#include <cstddef>
+#include <cstdlib>
+#include <new>
+
+// Vector with possibility to shrink capacity arbitrarily
+
+namespace dasynq {
+
+template <typename T>
+class svector
+{
+    private:
+    T * array;
+    size_t size_v;
+    size_t capacity_v;
+
+    void check_capacity()
+    {
+        if (size_v == capacity_v) {
+            // double capacity now:
+            if (capacity_v == 0) capacity_v = 1;
+            T * new_array = (T *) std::malloc(capacity_v * 2 * sizeof(T));
+            if (new_array == nullptr) {
+                throw std::bad_alloc();  
+            }
+            for (size_t i = 0; i < size_v; i++) {
+                new (&new_array[i]) T(std::move(array[i]));
+                array[i].T::~T();
+            }
+            std::free(array);
+            array = new_array;
+            capacity_v *= 2;
+        }
+    }
+
+    public:
+    using size_type = size_t;
+    
+    svector() : array(nullptr), size_v(0), capacity_v(0)
+    {
+    
+    }
+    
+    svector(const svector<T> &other)
+    {
+        capacity_v = other.size_v;
+        size_v = other.size_v;
+        array = (T *) std::malloc(capacity_v * sizeof(T));
+        if (array == nullptr) {
+            throw std::bad_alloc();
+        }
+        for (size_t i = 0; i < size_v; i++) {
+            new (&array[i]) T(other[i]);
+        }
+    }
+    
+    ~svector()
+    {
+        for (size_t i = 0; i < size_v; i++) {
+            array[i].T::~T();
+        }
+        std::free(array);
+    }
+
+    void push_back(const T &t)
+    {
+        check_capacity();
+        new (&array[size_v]) T(t);
+        size_v++;
+    }
+    
+    void push_back(T &&t)
+    {
+        check_capacity();        
+        new (&array[size_v]) T(t);
+        size_v++;
+    }
+    
+    template <typename ...U>
+    void emplace_back(U... args)
+    {
+        check_capacity();
+        new (&array[size_v]) T(args...);
+        size_v++;
+    }
+    
+    void pop_back()
+    {
+        size_v--;
+    }
+    
+    T &operator[](size_t index)
+    {
+        return array[index];
+    }
+    
+    const T &operator[](size_t index) const
+    {
+        return array[index];
+    }
+    
+    size_t size() const
+    {
+        return size_v;
+    }
+    
+    size_t capacity() const
+    {
+        return capacity_v;
+    }
+    
+    bool empty() const
+    {
+        return size_v == 0;
+    }
+    
+    void reserve(size_t amount)
+    {
+        if (capacity_v < amount) {
+            T * new_array = (T *) std::malloc(amount * sizeof(T));
+            if (new_array == nullptr) {
+                throw std::bad_alloc();
+            }
+            for (size_t i = 0; i < size_v; i++) {
+                new (&new_array[i]) T(std::move(array[i]));
+                array[i].T::~T();
+            }
+            std::free(array);
+            array = new_array;
+            capacity_v = amount;
+        }
+    }
+    
+    void shrink_to(size_t amount)
+    {
+        if (capacity_v > amount) {
+            T * new_array = (T *) std::malloc(amount * sizeof(T));
+            if (new_array == nullptr) {
+                return;
+            }
+            for (size_t i = 0; i < size_v; i++) {
+                new (&new_array[i]) T(std::move(array[i]));
+                array[i].T::~T();
+            }
+            std::free(array);
+            array = new_array;
+            capacity_v = amount;            
+        }
+    }
+    
+    T &back()
+    {
+        return array[size_v - 1];
+    }
+};
+
+
+} // namespace
+
+#endif
diff --git a/src/dasynq/dasynq-timerfd.h b/src/dasynq/dasynq-timerfd.h
new file mode 100644 (file)
index 0000000..db597b8
--- /dev/null
@@ -0,0 +1,236 @@
+#include <vector>
+#include <utility>
+
+#include <sys/timerfd.h>
+#include <time.h>
+
+#include "dasynq-binaryheap.h"
+
+namespace dasynq {
+
+// We could use one timerfd per timer, but then we need to differentiate timer
+// descriptors from regular file descriptors when events are reported by the loop
+// mechanism so that we can correctly report a timer event or fd event.
+
+// With a file descriptor or signal, we can use the item itself as the identifier for
+// adding/removing watches. For timers, it's more complicated. When we add a timer,
+// we are given a handle; we need to use this to modify the watch. We delegate the
+// process of allocating a handle to a priority heap implementation (BinaryHeap).
+
+
+class TimerData
+{
+    public:
+    // initial time?
+    struct timespec interval_time; // interval (if 0, one-off timer)
+    int expiry_count;  // number of times expired
+    bool enabled;   // whether timer reports events  
+    void *userdata;
+    
+    TimerData(void *udata = nullptr) : interval_time({0,0}), expiry_count(0), enabled(true), userdata(udata)
+    {
+        // constructor
+    }
+};
+
+class CompareTimespec
+{
+    public:
+    bool operator()(const struct timespec &a, const struct timespec &b)
+    {
+        if (a.tv_sec < b.tv_sec) {
+            return true;
+        }
+        
+        if (a.tv_sec == b.tv_sec) {
+            return a.tv_nsec < b.tv_nsec;
+        }
+        
+        return false;
+    }
+};
+
+using timer_handle_t = BinaryHeap<TimerData, struct timespec, CompareTimespec>::handle_t;
+
+static void init_timer_handle(timer_handle_t &hnd) noexcept
+{
+    BinaryHeap<TimerData, struct timespec, CompareTimespec>::init_handle(hnd);
+}
+
+
+template <class Base> class TimerFdEvents : public Base
+{
+    private:
+    int timerfd_fd = -1;
+
+    BinaryHeap<TimerData, struct timespec, CompareTimespec> timer_queue;
+
+    
+    static int divide_timespec(const struct timespec &num, const struct timespec &den)
+    {
+        // TODO
+        return 0;
+    }
+    
+    // Set the timerfd timeout to match the first timer in the queue (disable the timerfd
+    // if there are no active timers).
+    void set_timer_from_queue()
+    {
+        struct itimerspec newtime;
+        if (timer_queue.empty()) {
+            newtime.it_value = {0, 0};
+            newtime.it_interval = {0, 0};
+        }
+        else {
+            newtime.it_value = timer_queue.get_root_priority();
+            newtime.it_interval = {0, 0};
+        }
+        timerfd_settime(timerfd_fd, TFD_TIMER_ABSTIME, &newtime, nullptr);
+    }
+    
+    public:
+    template <typename T>
+    void receiveFdEvent(T &loop_mech, typename Base::FD_r fd_r, void * userdata, int flags)
+    {
+        if (userdata == &timerfd_fd) {
+            struct timespec curtime;
+            clock_gettime(CLOCK_MONOTONIC, &curtime); // in theory, can't fail on Linux
+            
+            // Peek timer queue; calculate difference between current time and timeout
+            struct timespec * timeout = &timer_queue.get_root_priority();            
+            while (timeout->tv_sec < curtime.tv_sec || (timeout->tv_sec == curtime.tv_sec &&
+                    timeout->tv_nsec <= curtime.tv_nsec)) {
+                // Increment expiry count
+                timer_queue.node_data(timer_queue.get_root()).expiry_count++;
+                // (a periodic timer may have overrun; calculated below).
+                
+                auto thandle = timer_queue.get_root();
+                TimerData &data = timer_queue.node_data(thandle);
+                timespec &interval = data.interval_time;
+                if (interval.tv_sec == 0 && interval.tv_nsec == 0) {
+                    // Non periodic timer
+                    timer_queue.pull_root();
+                    if (data.enabled) {
+                        int expiry_count = data.expiry_count;
+                        data.expiry_count = 0;
+                        Base::receiveTimerExpiry(thandle, data.userdata, expiry_count);
+                    }
+                    if (timer_queue.empty()) {
+                        break;
+                    }
+                }
+                else {
+                    // Periodic timer TODO
+                    // First calculate the overrun in time:
+                    /*
+                    struct timespec diff;
+                    diff.tv_sec = curtime.tv_sec - timeout->tv_sec;
+                    diff.tv_nsec = curtime.tv_nsec - timeout->tv_nsec;
+                    if (diff.tv_nsec < 0) {
+                        diff.tv_nsec += 1000000000;
+                        diff.tv_sec--;
+                    }
+                    */
+                    // Now we have to divide the time overrun by the period to find the
+                    // interval overrun. This requires a division of a value not representable
+                    // as a long...
+                    // TODO use divide_timespec
+                    // TODO better not to remove from queue maybe, but instead mark as inactive,
+                    // adjust timeout, and bubble into correct position
+                    // call Base::receieveTimerEvent
+                    // TODO
+                }
+                
+                // repeat until all expired timeouts processed
+                // timeout = &timer_queue[0].timeout;
+                //  (shouldn't be necessary; address hasn't changed...)
+            }
+            // arm timerfd with timeout from head of queue
+            set_timer_from_queue();
+        }
+        else {
+            Base::receiveFdEvent(loop_mech, fd_r, userdata, flags);
+        }
+    }
+
+    template <typename T> void init(T *loop_mech)
+    {
+        timerfd_fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
+        if (timerfd_fd == -1) {
+            throw std::system_error(errno, std::system_category());
+        }
+        loop_mech->addFdWatch(timerfd_fd, &timerfd_fd, IN_EVENTS);
+        Base::init(loop_mech);
+    }
+
+    // Add timer, return handle (TODO: clock id param?)
+    void addTimer(timer_handle_t &h, void *userdata)
+    {
+        timer_queue.allocate(h, userdata);
+    }
+    
+    void removeTimer(timer_handle_t &timer_id) noexcept
+    {
+        removeTimer_nolock(timer_id);
+    }
+    
+    void removeTimer_nolock(timer_handle_t &timer_id) noexcept
+    {
+        if (timer_queue.is_queued(timer_id)) {
+            timer_queue.remove(timer_id);
+        }
+        timer_queue.deallocate(timer_id);
+    }
+    
+    // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0.
+    //   enable: specifies whether to enable reporting of timeouts/intervals
+    void setTimer(timer_handle_t & timer_id, struct timespec &timeout, struct timespec &interval, bool enable) noexcept
+    {
+        auto &ts = timer_queue.node_data(timer_id);
+        ts.interval_time = interval;
+        ts.expiry_count = 0;
+        ts.enabled = enable;
+
+        if (timer_queue.is_queued(timer_id)) {
+            // Already queued; alter timeout
+            if (timer_queue.set_priority(timer_id, timeout)) {
+                set_timer_from_queue();
+            }
+        }
+        else {
+            if (timer_queue.insert(timer_id, timeout)) {
+                set_timer_from_queue();
+            }
+        }
+        
+        // TODO locking (here and everywhere)
+    }
+
+    // Set timer relative to current time:    
+    void setTimerRel(timer_handle_t & timer_id, struct timespec &timeout, struct timespec &interval, bool enable) noexcept
+    {
+        // TODO consider caching current time somehow; need to decide then when to update cached value.
+        struct timespec curtime;
+        clock_gettime(CLOCK_MONOTONIC, &curtime);
+        curtime.tv_sec += timeout.tv_sec;
+        curtime.tv_nsec += timeout.tv_nsec;
+        if (curtime.tv_nsec > 1000000000) {
+            curtime.tv_nsec -= 1000000000;
+            curtime.tv_sec++;
+        }
+        setTimer(timer_id, curtime, interval, enable);
+    }
+    
+    // Enables or disabling report of timeouts (does not stop timer)
+    void enableTimer(timer_handle_t & timer_id, bool enable) noexcept
+    {
+        enableTimer_nolock(timer_id, enable);
+    }
+    
+    void enableTimer_nolock(timer_handle_t & timer_id, bool enable) noexcept
+    {
+        timer_queue.node_data(timer_id).enabled = enable;
+    }
+};
+
+}
index 6dc785adb4f3967bcb34f0b9ce44aed18d3371c2..560a44e14f39cbf67f3ba3c75f7143e21fb60d4a 100644 (file)
@@ -1,31 +1,37 @@
-#ifndef DASYNC_H_INCLUDED
-#define DASYNC_H_INCLUDED
+#ifndef DASYNQ_H_INCLUDED
+#define DASYNQ_H_INCLUDED
 
 #if defined(__OpenBSD__)
-#define HAVE_KQUEUE 1
+#define DASYNQ_HAVE_KQUEUE 1
 #endif
 
 #if defined(__linux__)
-#define HAVE_EPOLL 1
+#define DASYNQ_HAVE_EPOLL 1
 #endif
 
 #include "dasynq-flags.h"
+#include "dasynq-binaryheap.h"
 
-#if defined(HAVE_KQUEUE)
+#if defined(DASYNQ_CUSTOM_LOOP_IMPLEMENTATION)
+// Loop and LoopTraits defined already; used for testing
+#elif defined(DASYNQ_HAVE_KQUEUE)
 #include "dasynq-kqueue.h"
+#include "dasynq-itimer.h"
 #include "dasynq-childproc.h"
 namespace dasynq {
-    template <typename T> using Loop = KqueueLoop<T>;
+    template <typename T> using Loop = KqueueLoop<ITimerEvents<ChildProcEvents<T>>>;
     using LoopTraits = KqueueTraits;
 }
-#elif defined(HAVE_EPOLL)
+#elif defined(DASYNQ_HAVE_EPOLL)
 #include "dasynq-epoll.h"
+#include "dasynq-timerfd.h"
 #include "dasynq-childproc.h"
 namespace dasynq {
-    template <typename T> using Loop = EpollLoop<T>;
+    template <typename T> using Loop = EpollLoop<TimerFdEvents<ChildProcEvents<T>>>;
     using LoopTraits = EpollTraits;
 }
 #endif
+
 #include <atomic>
 #include <condition_variable>
 #include <cstdint>
@@ -44,9 +50,9 @@ namespace dasynq {
 
 #ifdef __GNUC__
 #ifndef __clang__
-#define EMPTY_BODY    char empty[0];  // Make class instances take up no space (gcc)    
+#define DASYNQ_EMPTY_BODY    char empty[0];  // Make class instances take up no space (gcc)    
 #else
-#define EMPTY_BODY    char empty[0] __attribute__((unused));  // Make class instances take up no space (clang)
+#define DASYNQ_EMPTY_BODY    char empty[0] __attribute__((unused));  // Make class instances take up no space (clang)
 #endif
 #endif
 
@@ -54,11 +60,20 @@ namespace dasynq {
 
 namespace dasynq {
 
+namespace dprivate {
+    class BaseWatcher;
+}
+
+using PrioQueue = BinaryHeap<dprivate::BaseWatcher *, int>;
+
+inline namespace {
+    constexpr int DEFAULT_PRIORITY = 50;
+}
 
 /**
  * Values for rearm/disarm return from event handlers
  */
-enum class Rearm
+enum class rearm
 {
     /** Re-arm the event watcher so that it receives further events */
     REARM,
@@ -73,28 +88,37 @@ enum class Rearm
 // TODO: add a REQUEUE option, which means, "I didn't complete input/output, run me again soon"
 };
 
+// Different timer clock types
+enum class ClockType
+{
+    WALLTIME,
+    MONOTONIC
+};
+
 // Information about a received signal.
 // This is essentially a wrapper for the POSIX siginfo_t; its existence allows for mechanisms that receive
 // equivalent signal information in a different format (eg signalfd on Linux).
 using SigInfo = LoopTraits::SigInfo;
 
 // Forward declarations:
-template <typename T_Mutex> class EventLoop;
+template <typename T_Mutex> class event_loop;
 
 namespace dprivate {
     // (non-public API)
     
-    template <typename T_Mutex> class FdWatcher;
-    template <typename T_Mutex> class BidiFdWatcher;
-    template <typename T_Mutex> class SignalWatcher;
-    template <typename T_Mutex> class ChildProcWatcher;
+    template <typename T_Loop> class FdWatcher;
+    template <typename T_Loop> class BidiFdWatcher;
+    template <typename T_Loop> class SignalWatcher;
+    template <typename T_Loop> class ChildProcWatcher;
+    template <typename T_Loop> class Timer;
     
     enum class WatchType
     {
         SIGNAL,
         FD,
         CHILD,
-        SECONDARYFD
+        SECONDARYFD,
+        TIMER
     };
     
     template <typename T_Mutex, typename Traits> class EventDispatch;
@@ -108,15 +132,20 @@ namespace dprivate {
     class BaseWatcher
     {
         template <typename T_Mutex, typename Traits> friend class EventDispatch;
-        template <typename T_Mutex> friend class dasynq::EventLoop;
+        template <typename T_Mutex> friend class dasynq::event_loop;
         
         protected:
         WatchType watchType;
-        int active : 1;
-        int deleteme : 1;
+        int active : 1;    // currently executing handler?
+        int deleteme : 1;  // delete when handler finished?
         
-        BaseWatcher * prev;
-        BaseWatcher * next;
+        PrioQueue::handle_t heap_handle;
+        int priority;
+        
+        static void set_priority(BaseWatcher &p, int prio)
+        {
+            p.priority = prio;
+        }
         
         public:
         
@@ -125,8 +154,8 @@ namespace dprivate {
         {
             active = false;
             deleteme = false;
-            prev = nullptr;
-            next = nullptr;
+            PrioQueue::init_handle(heap_handle);
+            priority = DEFAULT_PRIORITY;
         }
         
         BaseWatcher(WatchType wt) noexcept : watchType(wt) { }
@@ -149,7 +178,7 @@ namespace dprivate {
     class BaseSignalWatcher : public BaseWatcher
     {
         template <typename M, typename Traits> friend class EventDispatch;
-        friend class dasynq::EventLoop<T_Mutex>;
+        friend class dasynq::event_loop<T_Mutex>;
 
         protected:
         SigInfo siginfo;
@@ -158,14 +187,14 @@ namespace dprivate {
         public:
         typedef SigInfo &SigInfo_p;
         
-        virtual Rearm received(EventLoop<T_Mutex> &eloop, int signo, SigInfo_p siginfo) = 0;
+        virtual rearm received(event_loop<T_Mutex> &eloop, int signo, SigInfo_p siginfo) = 0;
     };
     
     template <typename T_Mutex>
     class BaseFdWatcher : public BaseWatcher
     {
         template <typename, typename Traits> friend class EventDispatch;
-        friend class dasynq::EventLoop<T_Mutex>;
+        friend class dasynq::event_loop<T_Mutex>;
         
         protected:
         int watch_fd;
@@ -177,19 +206,19 @@ namespace dprivate {
         BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }
         
         public:
-        virtual Rearm fdEvent(EventLoop<T_Mutex> &eloop, int fd, int flags) = 0;
+        virtual rearm fdEvent(event_loop<T_Mutex> &eloop, int fd, int flags) = 0;
     };
     
     template <typename T_Mutex>
     class BaseBidiFdWatcher : public BaseFdWatcher<T_Mutex>
     {
         template <typename, typename Traits> friend class EventDispatch;
-        friend class dasynq::EventLoop<T_Mutex>;
+        friend class dasynq::event_loop<T_Mutex>;
         
         // This should never actually get called:
-        Rearm fdEvent(EventLoop<T_Mutex> &eloop, int fd, int flags) final
+        rearm fdEvent(event_loop<T_Mutex> &eloop, int fd, int flags) final
         {
-            return Rearm::REARM; // should not be reachable.
+            return rearm::REARM; // should not be reachable.
         };
         
         protected:
@@ -202,15 +231,15 @@ namespace dprivate {
         int write_removed : 1; // write watch removed?
         
         public:
-        virtual Rearm readReady(EventLoop<T_Mutex> &eloop, int fd) noexcept = 0;
-        virtual Rearm writeReady(EventLoop<T_Mutex> &eloop, int fd) noexcept = 0;
+        virtual rearm readReady(event_loop<T_Mutex> &eloop, int fd) noexcept = 0;
+        virtual rearm writeReady(event_loop<T_Mutex> &eloop, int fd) noexcept = 0;
     };
     
     template <typename T_Mutex>
     class BaseChildWatcher : public BaseWatcher
     {
         template <typename, typename Traits> friend class EventDispatch;
-        friend class dasynq::EventLoop<T_Mutex>;
+        friend class dasynq::event_loop<T_Mutex>;
         
         protected:
         pid_t watch_pid;
@@ -219,7 +248,30 @@ namespace dprivate {
         BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { }
         
         public:
-        virtual Rearm childStatus(EventLoop<T_Mutex> &eloop, pid_t child, int status) = 0;
+        virtual rearm childStatus(event_loop<T_Mutex> &eloop, pid_t child, int status) = 0;
+    };
+    
+
+    template <typename T_Mutex>
+    class BaseTimerWatcher : public BaseWatcher
+    {
+        template <typename, typename Traits> friend class EventDispatch;
+        friend class dasynq::event_loop<T_Mutex>;
+        
+        protected:
+        timer_handle_t timer_handle;
+        int intervals;
+
+        BaseTimerWatcher() : BaseWatcher(WatchType::TIMER)
+        {
+            init_timer_handle(timer_handle);
+        }
+        
+        public:
+        // Timer expired, and the given number of intervals have elapsed before
+        // expiry evenet was queued. Normally intervals == 1 to indicate no
+        // overrun.
+        virtual rearm timerExpiry(event_loop<T_Mutex> &eloop, int intervals) = 0;
     };
 
     // Classes for implementing a fair(ish) wait queue.
@@ -249,13 +301,13 @@ namespace dprivate {
     template <> class waitqueue_node<NullMutex>
     {
         // Specialised waitqueue_node for NullMutex.
-        // TODO can this be reduced to 0 data members?
         friend class waitqueue<NullMutex>;
-        waitqueue_node * next = nullptr;
         
         public:
         void wait(std::unique_lock<NullMutex> &ul) { }
         void signal() { }
+        
+        DASYNQ_EMPTY_BODY;
     };
 
     template <typename T_Mutex> class waitqueue_node
@@ -276,6 +328,34 @@ namespace dprivate {
         }
     };
 
+    template <> class waitqueue<NullMutex>
+    {
+        public:
+        waitqueue_node<NullMutex> * unqueue()
+        {
+            return nullptr;
+        }
+        
+        waitqueue_node<NullMutex> * getHead()
+        {
+            return nullptr;
+        }
+        
+        bool checkHead(waitqueue_node<NullMutex> &node)
+        {
+            return true;
+        }
+        
+        bool isEmpty()
+        {
+            return true;
+        }
+        
+        void queue(waitqueue_node<NullMutex> *node)
+        {
+        }
+    };
+
     template <typename T_Mutex> class waitqueue
     {
         waitqueue_node<T_Mutex> * tail = nullptr;
@@ -293,6 +373,16 @@ namespace dprivate {
             return head;
         }
         
+        bool checkHead(waitqueue_node<T_Mutex> &node)
+        {
+            return head == &node;
+        }
+        
+        bool isEmpty()
+        {
+            return head == nullptr;
+        }
+        
         void queue(waitqueue_node<T_Mutex> *node)
         {
             if (tail) {
@@ -303,68 +393,65 @@ namespace dprivate {
             }
         }
     };
-
+    
     // This class serves as the base class (mixin) for the AEN mechanism class.
     //
     // The EventDispatch class maintains the queued event data structures. It inserts watchers
     // into the queue when eventes are received (receiveXXX methods).
     template <typename T_Mutex, typename Traits> class EventDispatch : public Traits
     {
-        friend class EventLoop<T_Mutex>;
+        friend class event_loop<T_Mutex>;
 
         // queue data structure/pointer
-        BaseWatcher * first = nullptr;
+        PrioQueue event_queue;
         
         using BaseSignalWatcher = dasynq::dprivate::BaseSignalWatcher<T_Mutex>;
         using BaseFdWatcher = dasynq::dprivate::BaseFdWatcher<T_Mutex>;
         using BaseBidiFdWatcher = dasynq::dprivate::BaseBidiFdWatcher<T_Mutex>;
         using BaseChildWatcher = dasynq::dprivate::BaseChildWatcher<T_Mutex>;
+        using BaseTimerWatcher = dasynq::dprivate::BaseTimerWatcher<T_Mutex>;
         
-        void queueWatcher(BaseWatcher *bwatcher)
+        // Add a watcher into the queuing system (but don't queue it)
+        //   may throw: std::bad_alloc
+        void prepare_watcher(BaseWatcher *bwatcher)
         {
-            // Put in queue:
-            if (first == nullptr) {
-                bwatcher->prev = bwatcher;
-                bwatcher->next = bwatcher;
-                first = bwatcher;
-            }
-            else {
-                first->prev->next = bwatcher;
-                bwatcher->prev = first->prev;
-                first->prev = bwatcher;
-                bwatcher->next = first;
-            }
+            event_queue.allocate(bwatcher->heap_handle, bwatcher);
+        }
+        
+        void queueWatcher(BaseWatcher *bwatcher) noexcept
+        {
+            event_queue.insert(bwatcher->heap_handle, bwatcher->priority);
         }
         
         bool isQueued(BaseWatcher *bwatcher)
         {
-            return bwatcher->prev != nullptr;
+            return event_queue.is_queued(bwatcher->heap_handle);
         }
 
         void dequeueWatcher(BaseWatcher *bwatcher)
         {
-            if (bwatcher->prev == bwatcher) {
-                // Only item in queue
-                first = nullptr;
+            if (event_queue.is_queued(bwatcher->heap_handle)) {
+                event_queue.remove(bwatcher->heap_handle);
             }
-            else {
-                if (first == bwatcher) first = first->next;
-                bwatcher->prev->next = bwatcher->next;
-                bwatcher->next->prev = bwatcher->prev;
-            }
-            
-            bwatcher->prev = nullptr;
-            bwatcher->next = nullptr;
+        }
+        
+        // Remove watcher from the queueing system
+        void release_watcher(BaseWatcher *bwatcher)
+        {
+            event_queue.deallocate(bwatcher->heap_handle);
         }
         
         protected:
         T_Mutex lock;
         
-        void receiveSignal(typename Traits::SigInfo & siginfo, void * userdata)
+        // Receive a signal; return true to disable signal watch or false to leave enabled
+        template <typename T>
+        bool receiveSignal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata)
         {
             BaseSignalWatcher * bwatcher = static_cast<BaseSignalWatcher *>(userdata);
             bwatcher->siginfo = siginfo;
             queueWatcher(bwatcher);
+            return true;
         }
         
         template <typename T>
@@ -411,13 +498,23 @@ namespace dprivate {
             queueWatcher(watcher);
         }
         
+        void receiveTimerExpiry(timer_handle_t & timer_handle, void * userdata, int intervals)
+        {
+            BaseTimerWatcher * watcher = static_cast<BaseTimerWatcher *>(userdata);
+            watcher->intervals = intervals;
+            queueWatcher(watcher);
+        }
+        
         // Pull a single event from the queue
         BaseWatcher * pullEvent()
         {
-            BaseWatcher * r = first;
-            if (r != nullptr) {
-                dequeueWatcher(r);
+            if (event_queue.empty()) {
+                return nullptr;
             }
+            
+            auto rhndl = event_queue.get_root();
+            BaseWatcher *r = event_queue.node_data(rhndl);
+            event_queue.pull_root();
             return r;
         }
         
@@ -433,13 +530,13 @@ namespace dprivate {
                 // If the watcher is active, set deleteme true; the watcher will be removed
                 // at the end of current processing (i.e. when active is set false).
                 watcher->deleteme = true;
+                release_watcher(watcher);
                 lock.unlock();
             }
             else {
                 // Actually do the delete.
-                if (isQueued(watcher)) {
-                    dequeueWatcher(watcher);
-                }
+                dequeueWatcher(watcher);
+                release_watcher(watcher);
                 
                 lock.unlock();
                 watcher->watchRemoved();
@@ -452,24 +549,22 @@ namespace dprivate {
             
             if (watcher->active) {
                 watcher->deleteme = true;
+                release_watcher(watcher);
             }
             else {
-                if (isQueued(watcher)) {
-                    dequeueWatcher(watcher);
-                }
-                
+                dequeueWatcher(watcher);
+                release_watcher(watcher);
                 watcher->read_removed = true;
             }
             
             BaseWatcher *secondary = &(watcher->outWatcher);
             if (secondary->active) {
                 secondary->deleteme = true;
+                release_watcher(watcher);
             }
             else {
-                if (isQueued(secondary)) {
-                    dequeueWatcher(secondary);
-                }
-                
+                dequeueWatcher(secondary);
+                release_watcher(watcher);
                 watcher->write_removed = true;
             }
             
@@ -485,12 +580,13 @@ namespace dprivate {
 }
 
 
-template <typename T_Mutex> class EventLoop
+template <typename T_Mutex> class event_loop
 {
-    friend class dprivate::FdWatcher<EventLoop<T_Mutex>>;
-    friend class dprivate::BidiFdWatcher<EventLoop<T_Mutex>>;
-    friend class dprivate::SignalWatcher<EventLoop<T_Mutex>>;
-    friend class dprivate::ChildProcWatcher<EventLoop<T_Mutex>>;
+    friend class dprivate::FdWatcher<event_loop<T_Mutex>>;
+    friend class dprivate::BidiFdWatcher<event_loop<T_Mutex>>;
+    friend class dprivate::SignalWatcher<event_loop<T_Mutex>>;
+    friend class dprivate::ChildProcWatcher<event_loop<T_Mutex>>;
+    friend class dprivate::Timer<event_loop<T_Mutex>>;
     
     public:
     using LoopTraits = dasynq::LoopTraits;
@@ -504,9 +600,10 @@ template <typename T_Mutex> class EventLoop
     using BaseFdWatcher = dprivate::BaseFdWatcher<T_Mutex>;
     using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher<T_Mutex>;
     using BaseChildWatcher = dprivate::BaseChildWatcher<T_Mutex>;
+    using BaseTimerWatcher = dprivate::BaseTimerWatcher<T_Mutex>;
     using WatchType = dprivate::WatchType;
     
-    Loop<ChildProcEvents<EventDispatch<T_Mutex, LoopTraits>>> loop_mech;
+    Loop<EventDispatch<T_Mutex, LoopTraits>> loop_mech;
 
     // There is a complex problem with most asynchronous event notification mechanisms
     // when used in a multi-threaded environment. Generally, a file descriptor or other
@@ -563,7 +660,14 @@ template <typename T_Mutex> class EventLoop
     
     void registerSignal(BaseSignalWatcher *callBack, int signo)
     {
-        loop_mech.addSignalWatch(signo, callBack);
+        loop_mech.prepare_watcher(callBack);
+        try {
+            loop_mech.addSignalWatch(signo, callBack);
+        }
+        catch (...) {
+            loop_mech.release_watcher(callBack);
+            throw;
+        }
     }
     
     void deregister(BaseSignalWatcher *callBack, int signo) noexcept
@@ -581,16 +685,37 @@ template <typename T_Mutex> class EventLoop
 
     void registerFd(BaseFdWatcher *callback, int fd, int eventmask, bool enabled)
     {
-        loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled);
+        loop_mech.prepare_watcher(callback);
+        try {
+            loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled);
+        }
+        catch (...) {
+            loop_mech.release_watcher(callback);
+            throw;
+        }
     }
     
     void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask)
     {
-        if (LoopTraits::has_separate_rw_fd_watches) {
-            loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT);
+        loop_mech.prepare_watcher(callback);
+        try {
+            loop_mech.prepare_watcher(&callback->outWatcher);
+            try {
+                if (LoopTraits::has_separate_rw_fd_watches) {
+                    loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT);
+                }
+                else {
+                    loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT);
+                }
+            }
+            catch (...) {
+                loop_mech.release_watcher(&callback->outWatcher);
+                throw;
+            }
         }
-        else {
-            loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT);
+        catch (...) {
+            loop_mech.release_watcher(callback);
+            throw;
         }
     }
     
@@ -645,19 +770,34 @@ template <typename T_Mutex> class EventLoop
         releaseLock(qnode);
     }
     
-    void reserveChildWatch(BaseChildWatcher *callBack)
+    void reserveChildWatch(BaseChildWatcher *callback)
     {
-        loop_mech.reserveChildWatch();
+        loop_mech.prepare_watcher(callback);
+        try {
+            loop_mech.reserveChildWatch();
+        }
+        catch (...) {
+            loop_mech.release_watcher(callback);
+            throw;
+        }
     }
     
-    void unreserve(BaseChildWatcher *callBack)
+    void unreserve(BaseChildWatcher *callback)
     {
         loop_mech.unreserveChildWatch();
+        loop_mech.release_watcher(callback);
     }
     
-    void registerChild(BaseChildWatcher *callBack, pid_t child)
+    void registerChild(BaseChildWatcher *callback, pid_t child)
     {
-        loop_mech.addChildWatch(child, callBack);
+        loop_mech.prepare_watcher(callback);
+        try {
+            loop_mech.addChildWatch(child, callback);
+        }
+        catch (...) {
+            loop_mech.release_watcher(callback);
+            throw;
+        }
     }
     
     void registerReservedChild(BaseChildWatcher *callBack, pid_t child) noexcept
@@ -683,11 +823,55 @@ template <typename T_Mutex> class EventLoop
         releaseLock(qnode);
     }
     
-    void dequeueWatcher(BaseWatcher *watcher) noexcept
+    void registerTimer(BaseTimerWatcher *callback)
     {
-        if (loop_mech.isQueued(watcher)) {
-            loop_mech.dequeueWatcher(watcher);
+        loop_mech.prepare_watcher(callback);
+        try {
+            loop_mech.addTimer(callback->timer_handle, callback);
         }
+        catch (...) {
+            loop_mech.release_watcher(callback);
+        }
+    }
+    
+    void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout)
+    {
+        struct timespec interval {0, 0};
+        loop_mech.setTimer(callBack->timer_handle, timeout, interval, true);
+    }
+    
+    void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval)
+    {
+        loop_mech.setTimer(callBack->timer_handle, timeout, interval, true);
+    }
+
+    void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout)
+    {
+        struct timespec interval {0, 0};
+        loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true);
+    }
+    
+    void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval)
+    {
+        loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true);
+    }
+
+    void deregister(BaseTimerWatcher *callback)
+    {
+        loop_mech.removeTimer(callback->timer_handle);
+        
+        waitqueue_node<T_Mutex> qnode;
+        getAttnLock(qnode);
+        
+        EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+        ed.issueDelete(callback);
+        
+        releaseLock(qnode);
+    }
+    
+    void dequeueWatcher(BaseWatcher *watcher) noexcept
+    {
+        loop_mech.dequeueWatcher(watcher);
     }
 
     // Acquire the attention lock (when held, ensures that no thread is polling the AEN
@@ -696,9 +880,9 @@ template <typename T_Mutex> class EventLoop
     {
         std::unique_lock<T_Mutex> ulock(wait_lock);
         attn_waitqueue.queue(&qnode);        
-        if (attn_waitqueue.getHead() != &qnode) {
+        if (! attn_waitqueue.checkHead(qnode)) {
             loop_mech.interruptWait();
-            while (attn_waitqueue.getHead() != &qnode) {
+            while (! attn_waitqueue.checkHead(qnode)) {
                 qnode.wait(ulock);
             }
         }
@@ -709,7 +893,7 @@ template <typename T_Mutex> class EventLoop
     void getPollwaitLock(waitqueue_node<T_Mutex> &qnode)
     {
         std::unique_lock<T_Mutex> ulock(wait_lock);
-        if (attn_waitqueue.getHead() == nullptr) {
+        if (attn_waitqueue.isEmpty()) {
             // Queue is completely empty:
             attn_waitqueue.queue(&qnode);
         }
@@ -717,7 +901,7 @@ template <typename T_Mutex> class EventLoop
             wait_waitqueue.queue(&qnode);
         }
         
-        while (attn_waitqueue.getHead() != &qnode) {
+        while (! attn_waitqueue.checkHead(qnode)) {
             qnode.wait(ulock);
         }    
     }
@@ -739,18 +923,18 @@ template <typename T_Mutex> class EventLoop
         }                
     }
     
-    void processSignalRearm(BaseSignalWatcher * bsw, Rearm rearmType)
+    void processSignalRearm(BaseSignalWatcher * bsw, rearm rearmType)
     {
         // Called with lock held
-        if (rearmType == Rearm::REARM) {
+        if (rearmType == rearm::REARM) {
             loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo());
         }
-        else if (rearmType == Rearm::REMOVE) {
+        else if (rearmType == rearm::REMOVE) {
             loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo());
         }
     }
 
-    Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch)
+    rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch)
     {
         // Called with lock held;
         //   bdfw->event_flags contains only with pending (queued) events
@@ -758,28 +942,28 @@ template <typename T_Mutex> class EventLoop
         if (is_multi_watch) {
             BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
             
-            if (rearmType == Rearm::REMOVE) {
+            if (rearmType == rearm::REMOVE) {
                 bdfw->read_removed = 1;
                 bdfw->watch_flags &= ~IN_EVENTS;
                 
                 if (! LoopTraits::has_separate_rw_fd_watches) {
                     if (! bdfw->write_removed) {
-                        return Rearm::NOOP;
+                        return rearm::NOOP;
                     }
                     else {
                         // both removed: actually remove
                         loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
-                        return Rearm::REMOVE;
+                        return rearm::REMOVE;
                     }
                 }
                 else {
                     loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
                 }
             }
-            else if (rearmType == Rearm::DISARM) {
+            else if (rearmType == rearm::DISARM) {
                 // Nothing more to do
             }
-            else if (rearmType == Rearm::REARM) {
+            else if (rearmType == rearm::REARM) {
                 bdfw->watch_flags |= IN_EVENTS;
                 
                 if (! LoopTraits::has_separate_rw_fd_watches) {
@@ -799,12 +983,12 @@ template <typename T_Mutex> class EventLoop
             }
             return rearmType;
         }
-        else {
-            if (rearmType == Rearm::REARM) {
+        else { // Not multi-watch:
+            if (rearmType == rearm::REARM) {
                 loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw,
                         (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
             }
-            else if (rearmType == Rearm::REMOVE) {
+            else if (rearmType == rearm::REMOVE) {
                 loop_mech.removeFdWatch_nolock(bfw->watch_fd, bfw->watch_flags);
             }
             return rearmType;
@@ -812,32 +996,32 @@ template <typename T_Mutex> class EventLoop
     }
 
     // Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher.
-    Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType)
+    rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, rearm rearmType)
     {
         // Called with lock held
-        if (rearmType == Rearm::REMOVE) {
+        if (rearmType == rearm::REMOVE) {
             bdfw->write_removed = 1;
             bdfw->watch_flags &= ~OUT_EVENTS;
             
             if (LoopTraits::has_separate_rw_fd_watches) {
                 loop_mech.removeFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS);
-                return bdfw->read_removed ? Rearm::REMOVE : Rearm::NOOP;
+                return bdfw->read_removed ? rearm::REMOVE : rearm::NOOP;
             }
             else {
                 if (! bdfw->read_removed) {
-                    return Rearm::NOOP;
+                    return rearm::NOOP;
                 }
                 else {
                     // both removed: actually remove
                     loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
-                    return Rearm::REMOVE;
+                    return rearm::REMOVE;
                 }
             }
         }
-        else if (rearmType == Rearm::DISARM) {
+        else if (rearmType == rearm::DISARM) {
             // Nothing more to do
         }
-        else if (rearmType == Rearm::REARM) {
+        else if (rearmType == rearm::REARM) {
             bdfw->watch_flags |= OUT_EVENTS;
             
             if (! LoopTraits::has_separate_rw_fd_watches) {
@@ -857,6 +1041,17 @@ template <typename T_Mutex> class EventLoop
         }
         return rearmType;
     }
+    
+    void processTimerRearm(BaseTimerWatcher *btw, rearm rearmType)
+    {
+        // Called with lock held
+        if (rearmType == rearm::REARM) {
+            loop_mech.enableTimer_nolock(btw->timer_handle, true);
+        }
+        else if (rearmType == rearm::REMOVE) {
+            loop_mech.removeTimer_nolock(btw->timer_handle);
+        }
+    }
 
     bool processEvents() noexcept
     {
@@ -874,7 +1069,7 @@ template <typename T_Mutex> class EventLoop
             pqueue->active = true;
             active = true;
             
-            Rearm rearmType = Rearm::NOOP;
+            rearm rearmType = rearm::NOOP;
             bool is_multi_watch = false;
             BaseBidiFdWatcher *bbfw = nullptr;
             
@@ -939,18 +1134,23 @@ template <typename T_Mutex> class EventLoop
                 bbfw->event_flags &= ~OUT_EVENTS;
                 break;
             }
+            case WatchType::TIMER: {
+                BaseTimerWatcher *btw = static_cast<BaseTimerWatcher *>(pqueue);
+                rearmType = btw->timerExpiry(*this, btw->intervals);
+                break;
+            }
             default: ;
             }
 
             ed.lock.lock();
             
             // (if REMOVED, we must not touch pqueue at all)
-            if (rearmType != Rearm::REMOVED) {
+            if (rearmType != rearm::REMOVED) {
                 
                 pqueue->active = false;
                 if (pqueue->deleteme) {
                     // We don't want a watch that is marked "deleteme" to re-arm itself.
-                    rearmType = Rearm::REMOVE;
+                    rearmType = rearm::REMOVE;
                 }
                 switch (pqueue->watchType) {
                 case WatchType::SIGNAL:
@@ -962,11 +1162,17 @@ template <typename T_Mutex> class EventLoop
                 case WatchType::SECONDARYFD:
                     rearmType = processSecondaryRearm(bbfw, rearmType);
                     break;
+                case WatchType::TIMER:
+                    processTimerRearm(static_cast<BaseTimerWatcher *>(pqueue), rearmType);
+                    break;
                 default: ;
                 }
                 
-                if (pqueue->deleteme || rearmType == Rearm::REMOVE) {
+                if (rearmType == rearm::REMOVE) {
                     ed.lock.unlock();
+                    // Note that for BidiFd watches, watchRemoved is only called on the primary watch.
+                    // The process function called above only returns Rearm::REMOVE if both primary and
+                    // secondary watches have been removed.
                     (is_multi_watch ? bbfw : pqueue)->watchRemoved();
                     ed.lock.lock();
                 }
@@ -983,10 +1189,11 @@ template <typename T_Mutex> class EventLoop
     public:
     using mutex_t = T_Mutex;
     
-    using FdWatcher = dprivate::FdWatcher<EventLoop<T_Mutex>>;
-    using BidiFdWatcher = dprivate::BidiFdWatcher<EventLoop<T_Mutex>>;
-    using SignalWatcher = dprivate::SignalWatcher<EventLoop<T_Mutex>>;
-    using ChildProcWatcher = dprivate::ChildProcWatcher<EventLoop<T_Mutex>>;
+    using FdWatcher = dprivate::FdWatcher<event_loop<T_Mutex>>;
+    using BidiFdWatcher = dprivate::BidiFdWatcher<event_loop<T_Mutex>>;
+    using SignalWatcher = dprivate::SignalWatcher<event_loop<T_Mutex>>;
+    using ChildProcWatcher = dprivate::ChildProcWatcher<event_loop<T_Mutex>>;
+    using Timer = dprivate::Timer<event_loop<T_Mutex>>;
     
     // using LoopTraits = dasynq::LoopTraits;
     
@@ -1008,8 +1215,8 @@ template <typename T_Mutex> class EventLoop
     }
 };
 
-typedef EventLoop<NullMutex> NEventLoop;
-typedef EventLoop<std::mutex> TEventLoop;
+typedef event_loop<NullMutex> NEventLoop;
+typedef event_loop<std::mutex> TEventLoop;
 
 // from dasync.cc:
 TEventLoop & getSystemLoop();
@@ -1030,9 +1237,10 @@ public:
     // If an attempt is made to register with more than one event loop at
     // a time, behaviour is undefined. The signal should be masked before
     // call.
-    inline void addWatch(EventLoop &eloop, int signo)
+    inline void addWatch(EventLoop &eloop, int signo, int prio = DEFAULT_PRIORITY)
     {
         BaseWatcher::init();
+        this->priority = prio;
         this->siginfo.set_signo(signo);
         eloop.registerSignal(this, signo);
     }
@@ -1042,7 +1250,7 @@ public:
         eloop.deregister(this, this->siginfo.get_signo());
     }
     
-    // virtual Rearm received(EventLoop &, int signo, SigInfo_p info) = 0;
+    // virtual rearm received(EventLoop &, int signo, SigInfo_p info) = 0;
 };
 
 // Posix file descriptor event watcher
@@ -1078,9 +1286,10 @@ class FdWatcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
     // causes undefined behavior.
     //
     // Can fail with std::bad_alloc or std::system_error.
-    void addWatch(EventLoop &eloop, int fd, int flags, bool enabled = true)
+    void addWatch(EventLoop &eloop, int fd, int flags, bool enabled = true, int prio = DEFAULT_PRIORITY)
     {
         BaseWatcher::init();
+        this->priority = prio;
         this->watch_fd = fd;
         this->watch_flags = flags;
         eloop.registerFd(this, fd, flags, enabled);
@@ -1098,7 +1307,7 @@ class FdWatcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
     // (which will not occur until the event handler returns, if it is active).
     // In a single threaded environment, it is safe to delete the watcher after
     // calling this method as long as the handler (if it is active) accesses no
-    // internal state and returns Rearm::REMOVED.
+    // internal state and returns rearm::REMOVED.
     void deregister(EventLoop &eloop) noexcept
     {
         eloop.deregister(this, this->watch_fd);
@@ -1113,7 +1322,39 @@ class FdWatcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
         }
     }
     
-    // virtual Rearm fdEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
+    // Add an Fd watch via a lambda. The watch is allocated dynamically and destroys
+    // itself when removed from the event loop.
+    template <typename T>
+    static FdWatcher<EventLoop> *addWatch(EventLoop &eloop, int fd, int flags, T watchHndlr)
+    {
+        class LambdaFdWatcher : public FdWatcher<EventLoop>
+        {
+            private:
+            T watchHndlr;
+            
+            public:
+            LambdaFdWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a)
+            {
+                //
+            }
+            
+            rearm fdEvent(EventLoop &eloop, int fd, int flags) override
+            {
+                return watchHndlr(eloop, fd, flags);
+            }
+            
+            void watchRemoved() noexcept override
+            {
+                delete this;
+            }
+        };
+        
+        LambdaFdWatcher * lfd = new LambdaFdWatcher(watchHndlr);
+        lfd->addWatch(eloop, fd, flags);
+        return lfd;
+    }
+    
+    // virtual rearm fdEvent(EventLoop<T_Mutex> &, int fd, int flags) = 0;
 };
 
 // A Bi-directional file descriptor watcher with independent read- and write- channels.
@@ -1196,7 +1437,7 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
     // can be any combination of dasynq::IN_EVENTS / dasynq::OUT_EVENTS.
     //
     // Can fail with std::bad_alloc or std::system_error.
-    void addWatch(EventLoop &eloop, int fd, int flags)
+    void addWatch(EventLoop &eloop, int fd, int flags, int inprio = DEFAULT_PRIORITY, int outprio = DEFAULT_PRIORITY)
     {
         BaseWatcher::init();
         this->outWatcher.BaseWatcher::init();
@@ -1204,6 +1445,8 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
         this->watch_flags = flags | dprivate::multi_watch;
         this->read_removed = false;
         this->write_removed = false;
+        this->priority = inprio;
+        this->set_priority(this->outWatcher, outprio);
         eloop.registerFd(this, fd, flags);
     }
     
@@ -1219,14 +1462,14 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
     // (which will not occur until the event handler returns, if it is active).
     // In a single threaded environment, it is safe to delete the watcher after
     // calling this method as long as the handler (if it is active) accesses no
-    // internal state and returns Rearm::REMOVED.
+    // internal state and returns rearm::REMOVED.
     void deregister(EventLoop &eloop) noexcept
     {
         eloop.deregister(this, this->watch_fd);
     }
     
-    // Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
-    // Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
+    // rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
+    // rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
 };
 
 // Child process event watcher
@@ -1254,10 +1497,11 @@ class ChildProcWatcher : private dprivate::BaseChildWatcher<typename EventLoop::
     // Registration can fail with std::bad_alloc.
     // Note that in multi-threaded programs, use of this function may be prone to a
     // race condition such that the child terminates before the watcher is registered.
-    void addWatch(EventLoop &eloop, pid_t child)
+    void addWatch(EventLoop &eloop, pid_t child, int prio = DEFAULT_PRIORITY)
     {
         BaseWatcher::init();
         this->watch_pid = child;
+        this->priority = prio;
         eloop.registerChild(this, child);
     }
     
@@ -1265,10 +1509,13 @@ class ChildProcWatcher : private dprivate::BaseChildWatcher<typename EventLoop::
     // after having reserved resources previously (using reserveWith).
     // Registration cannot fail.
     // Note that in multi-threaded programs, use of this function may be prone to a
-    // race condition such that the child terminates before the watcher is registered.
-    void addReserved(EventLoop &eloop, pid_t child) noexcept
+    // race condition such that the child terminates before the watcher is registered;
+    // use the "fork" member function to avoid this.
+    void addReserved(EventLoop &eloop, pid_t child, int prio = DEFAULT_PRIORITY) noexcept
     {
         BaseWatcher::init();
+        this->watch_pid = child;
+        this->priority = prio;
         eloop.registerReservedChild(this, child);
     }
     
@@ -1360,7 +1607,46 @@ class ChildProcWatcher : private dprivate::BaseChildWatcher<typename EventLoop::
         }
     }
     
-    // virtual Rearm childStatus(EventLoop &, pid_t child, int status) = 0;
+    // virtual rearm childStatus(EventLoop &, pid_t child, int status) = 0;
+};
+
+template <typename EventLoop>
+class Timer : private BaseTimerWatcher<typename EventLoop::mutex_t>
+{
+    public:
+    
+    // Allocate a timer (using the MONOTONIC clock)
+    void addTimer(EventLoop &eloop, int prio = DEFAULT_PRIORITY)
+    {
+        this->priority = prio;
+        eloop.registerTimer(this);
+    }
+    
+    void armTimer(EventLoop &eloop, struct timespec &timeout) noexcept
+    {
+        eloop.setTimer(this, timeout);
+    }
+    
+    void armTimer(EventLoop &eloop, struct timespec &timeout, struct timespec &interval) noexcept
+    {
+        eloop.setTimer(this, timeout, interval);
+    }
+
+    // Arm timer, relative to now:
+    void armTimerRel(EventLoop &eloop, struct timespec &timeout) noexcept
+    {
+        eloop.setTimerRel(this, timeout);
+    }
+    
+    void armTimerRel(EventLoop &eloop, struct timespec &timeout, struct timespec &interval) noexcept
+    {
+        eloop.setTimerRel(this, timeout, interval);
+    }
+    
+    void deregister(EventLoop &eloop) noexcept
+    {
+        eloop.deregister(this);
+    }
 };
 
 }  // namespace dasynq::dprivate
index 77aeda73bf5355b3e33dfa070a4da1c318f29cd2..af46516d27afdb2583b1ab375a5418d47e3da309 100644 (file)
@@ -50,7 +50,7 @@ class BufferedLogStream : public EventLoop_t::FdWatcher
         release = false;
     }
     
-    Rearm fdEvent(EventLoop_t &loop, int fd, int flags) noexcept override;
+    rearm fdEvent(EventLoop_t &loop, int fd, int flags) noexcept override;
 
     // Check whether the console can be released.
     void flushForRelease();
@@ -119,7 +119,7 @@ void BufferedLogStream::flushForRelease()
     
     // Try to flush any messages that are currently buffered. (Console is non-blocking
     // so it will fail gracefully).
-    if (fdEvent(eventLoop, fd, OUT_EVENTS) == Rearm::DISARM) {
+    if (fdEvent(eventLoop, fd, OUT_EVENTS) == rearm::DISARM) {
         // Console has already been released at this point.
         setEnabled(eventLoop, false);
     }
@@ -127,7 +127,7 @@ void BufferedLogStream::flushForRelease()
     // release when it's finished.
 }
 
-Rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
+rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
 {
     if ((! partway) && (! special) && discarded) {
         special_buf = "dinit: *** message discarded due to full buffer ****\n";
@@ -147,25 +147,25 @@ Rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
                 
                 if (release) {
                     release_console();
-                    return Rearm::DISARM;
+                    return rearm::DISARM;
                 }
             }
             else {
                 msg_index += r;
-                return Rearm::REARM;
+                return rearm::REARM;
             }
         }
         else if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) {
-            return Rearm::REMOVE;
+            return rearm::REMOVE;
         }
-        return Rearm::REARM;
+        return rearm::REARM;
     }
     else {
         // Writing from the regular circular buffer
         
         if (current_index == 0) {
             release_console();
-            return Rearm::DISARM;
+            return rearm::DISARM;
         }
         
         // We try to find a complete line (terminated by '\n') in the buffer, and write it
@@ -218,18 +218,18 @@ Rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
                 if (current_index == 0 || release) {
                     // No more messages buffered / stop logging to console:
                     release_console();
-                    return Rearm::DISARM;
+                    return rearm::DISARM;
                 }
             }
         }
         else if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) {
-            return Rearm::REMOVE;
+            return rearm::REMOVE;
         }
     }
     
     // We've written something by the time we get here. We could fall through to below, but
     // let's give other events a chance to be processed by returning now.
-    return Rearm::REARM;
+    return rearm::REARM;
 }
 
 // Initialise the logging subsystem
index f9baf50d661fffb920248f622a0be98d875ae650..447fff432b69868f3f65517fae41bf79a2310484 100644 (file)
@@ -37,7 +37,7 @@
 
 
 using namespace dasynq;
-using EventLoop_t = EventLoop<NullMutex>;
+using EventLoop_t = event_loop<NullMutex>;
 
 EventLoop_t eventLoop = EventLoop_t();
 
@@ -54,10 +54,10 @@ void setup_external_log() noexcept;
 
 class ControlSocketWatcher : public EventLoop_t::FdWatcher
 {
-    Rearm fdEvent(EventLoop_t &loop, int fd, int flags) override
+    rearm fdEvent(EventLoop_t &loop, int fd, int flags) override
     {
         control_socket_cb(&loop, fd);
-        return Rearm::REARM;
+        return rearm::REARM;
     }
 };
 
@@ -119,19 +119,19 @@ namespace {
             this->cb_func = cb_func;
         }
         
-        Rearm received(EventLoop_t &eloop, int signo, SigInfo_p siginfo) override
+        rearm received(EventLoop_t &eloop, int signo, SigInfo_p siginfo) override
         {
             service_set->stop_all_services(ShutdownType::REBOOT);
-            return Rearm::REARM;
+            return rearm::REARM;
         }
     };
 
     class ControlSocketWatcher : public EventLoop_t::FdWatcher
     {
-        Rearm fdEvent(EventLoop_t &loop, int fd, int flags)
+        rearm fdEvent(EventLoop_t &loop, int fd, int flags)
         {
             control_socket_cb(&loop, fd);
-            return Rearm::REARM;
+            return rearm::REARM;
         }
     };
 }
index a8ea823091abfa3ddb0b01a654f5f063ec15f784..bc0e8a7b9c29877bd52a3f01fdff2431070b6c5f 100644 (file)
@@ -110,7 +110,7 @@ void ServiceRecord::stopped() noexcept
     }
 }
 
-dasynq::Rearm ServiceChildWatcher::childStatus(EventLoop_t &loop, pid_t child, int status) noexcept
+dasynq::rearm ServiceChildWatcher::childStatus(EventLoop_t &loop, pid_t child, int status) noexcept
 {
     ServiceRecord *sr = service;
     
@@ -125,14 +125,14 @@ dasynq::Rearm ServiceChildWatcher::childStatus(EventLoop_t &loop, pid_t child, i
     if (sr->waiting_for_execstat) {
         // We still don't have an exec() status from the forked child, wait for that
         // before doing any further processing.
-        return Rearm::REMOVE;
+        return rearm::REMOVE;
     }
     
     // Must deregister now since handle_exit_status might result in re-launch:
     deregister(loop, child);
     
     sr->handle_exit_status();
-    return Rearm::REMOVED;
+    return rearm::REMOVED;
 }
 
 bool ServiceRecord::do_auto_restart() noexcept
@@ -251,7 +251,7 @@ void ServiceRecord::handle_exit_status() noexcept
     }
 }
 
-Rearm ServiceIoWatcher::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
+rearm ServiceIoWatcher::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
 {
     ServiceRecord *sr = service;
     sr->waiting_for_execstat = false;
@@ -293,7 +293,7 @@ Rearm ServiceIoWatcher::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
     
     sr->service_set->processQueues(true);
     
-    return Rearm::REMOVED;
+    return rearm::REMOVED;
 }
 
 void ServiceRecord::require() noexcept
index 179799f0da1200bc5d28c7bfb9b089ddeba38685..fa9b16ac002d4a8ef82ae543cffe16eea1d6f658 100644 (file)
@@ -186,7 +186,7 @@ class ServiceChildWatcher : public EventLoop_t::ChildProcWatcher
 {
     public:
     ServiceRecord * service;
-    Rearm childStatus(EventLoop_t &eloop, pid_t child, int status) noexcept;
+    rearm childStatus(EventLoop_t &eloop, pid_t child, int status) noexcept;
     
     ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { }
 };
@@ -195,7 +195,7 @@ class ServiceIoWatcher : public EventLoop_t::FdWatcher
 {
     public:
     ServiceRecord * service;
-    Rearm fdEvent(EventLoop_t &eloop, int fd, int flags) noexcept;
+    rearm fdEvent(EventLoop_t &eloop, int fd, int flags) noexcept;
     
     ServiceIoWatcher(ServiceRecord * sr) noexcept : service(sr) { }
 };