Rip out libev, replace with dasynq (new library written for the purpose).
authorDavin McCall <davmac@davmac.org>
Sun, 5 Jun 2016 00:49:23 +0000 (01:49 +0100)
committerDavin McCall <davmac@davmac.org>
Sun, 5 Jun 2016 00:49:23 +0000 (01:49 +0100)
TODO
src/Makefile
src/control.cc
src/control.h
src/dasync/dasync-aen.h [new file with mode: 0644]
src/dasync/dasync.h [new file with mode: 0644]
src/dasync/dmutex.h [new file with mode: 0644]
src/dinit-log.cc
src/dinit.cc
src/service.cc
src/service.h

diff --git a/TODO b/TODO
index 7aeb2d89189ba4db98b227e80c65d7d6e12b5449..07e9236e379a73fdc1367a563dd5766f3313447f 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,3 +1,4 @@
+* CPBuffer: cleanup methods a bit (fix API)
 * When a PROCESS service process dies, and smooth_recovery is false, probably
   need to force-stop dependents even if the process itself was stopped
   deliberately.
@@ -6,7 +7,8 @@
   - support for listing all services
 * Implement a control utility to start/stop services after dinit has started
   - very basic version exists, needs thorough cleanup
-* Clean up tree a little, move source files into "src"
+* We've replaced libev, so that we don't abort on failure. But now exceptions
+  can be thrown when we register an event watch - need to handle these.
 
 For version 1.0:
 ----------------
@@ -44,7 +46,8 @@ For later:
   sends terminated process IDs over a pipe to Dinit. Finally, it may be possible
   to run dinit (and subprocesses) in a new PID namespace (again linux-only).
 * Allow logging tasks to memory (growing or circular buffer) and later
-  switching to disk logging (allows for filesystem mounted readonly on boot)
+  switching to disk logging (allows for filesystem mounted readonly on boot).
+  But perhaps this really the responsibility of another daemon.
 * Rate control on process respawn
 * Allow running services with different resource limits, chroot, cgroups,
   namespaces (pid/fs/uid), etc
@@ -62,8 +65,6 @@ Even later / Maybe never:
   factor.
 * Cron-like tasks (if started, they run a sub-task periodically. Stopping the
   task will wait until the sub-task is complete). 
-* Socket activation of services? Not sure if enough non-SystemD derived
-  daemons actually support this to warrant implementing it.
 * Allow to run services attached to virtual tty, allow connection to that tty (ala "screen").
 * SystemD-like handling of filesystem mounts (see autofs documentation in kernel)
   i.e. a mount point gets an autofs attached, and lazily gets mounted when accessed
index 25216eac505c5c846493adf6b450526bb33fad73..2ff0a8185ba04010e05f75fd8f533d5b5c6afdc4 100644 (file)
@@ -21,7 +21,7 @@ dinit-reboot: dinit-reboot.o
        $(CXX) -o dinit-reboot dinit-reboot.o   
 
 $(objects): %.o: %.cc service.h dinit-log.h control.h control-cmds.h cpbuffer.h
-       $(CXX) $(CXXOPTS) -c $< -o $@
+       $(CXX) $(CXXOPTS) -Idasync -c $< -o $@
 
 #install: all
 
index 30dd3166d1445d5f797c84cbdf5dc638a2490591..937269ccc6011322631e6ce3866f202ed73b8ced 100644 (file)
@@ -54,7 +54,7 @@ void ControlConn::processPacket()
         char outbuf[] = { DINIT_RP_BADREQ };
         if (! queuePacket(outbuf, 1)) return;
         bad_conn_close = true;
-        ev_io_set(&iob, iob.fd, EV_WRITE);
+        iob.setWatchFlags(out_events);
     }
     return;
 }
@@ -78,7 +78,7 @@ void ControlConn::processFindLoad(int pktType)
         char badreqRep[] = { DINIT_RP_BADREQ };
         if (! queuePacket(badreqRep, 1)) return;
         bad_conn_close = true;
-        ev_io_set(&iob, iob.fd, EV_WRITE);
+        iob.setWatchFlags(out_events);
         return;
     }
     
@@ -154,7 +154,7 @@ void ControlConn::processStartStop(int pktType)
         char badreqRep[] = { DINIT_RP_BADREQ };
         if (! queuePacket(badreqRep, 1)) return;
         bad_conn_close = true;
-        ev_io_set(&iob, iob.fd, EV_WRITE);
+        iob.setWatchFlags(out_events);
         return;
     }
     else {
@@ -216,7 +216,7 @@ void ControlConn::processUnpinService()
         char badreqRep[] = { DINIT_RP_BADREQ };
         if (! queuePacket(badreqRep, 1)) return;
         bad_conn_close = true;
-        ev_io_set(&iob, iob.fd, EV_WRITE);
+        iob.setWatchFlags(out_events);
         return;
     }
     else {
@@ -283,7 +283,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
             pkt += wr;
             size -= wr;
         }
-        ev_io_set(&iob, iob.fd, EV_READ | EV_WRITE);
+        iob.setWatchFlags(in_events | out_events);
     }
     
     // Create a vector out of the (remaining part of the) packet:
@@ -302,7 +302,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
             delete this;
         }
         else {
-            ev_io_set(&iob, iob.fd, EV_WRITE);
+            iob.setWatchFlags(out_events);
         }
         return false;    
     }
@@ -337,7 +337,7 @@ bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
             }
             outpkt_index = wr;
         }
-        ev_io_set(&iob, iob.fd, EV_READ | EV_WRITE);
+        iob.setWatchFlags(in_events | out_events);
     }
     
     try {
@@ -355,7 +355,7 @@ bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
             delete this;
         }
         else {
-            ev_io_set(&iob, iob.fd, EV_WRITE);
+            iob.setWatchFlags(out_events);
         }
         return false;
     }
@@ -403,7 +403,7 @@ bool ControlConn::dataReady() noexcept
         // TODO log error?
         // TODO error response?
         bad_conn_close = true;
-        ev_io_set(&iob, iob.fd, EV_WRITE);
+        iob.setWatchFlags(out_events);
     }
     
     return false;
@@ -446,7 +446,7 @@ void ControlConn::sendData() noexcept
         outpkt_index = 0;
         if (outbuf.empty() && ! oom_close) {
             if (! bad_conn_close) {
-                ev_io_set(&iob, iob.fd, EV_READ);
+                iob.setWatchFlags(in_events);
             }
             else {
                 delete this;
@@ -458,7 +458,7 @@ void ControlConn::sendData() noexcept
 ControlConn::~ControlConn() noexcept
 {
     close(iob.fd);
-    ev_io_stop(loop, &iob);
+    iob.deregisterWatch(loop);
     
     // Clear service listeners
     for (auto p : serviceKeyMap) {
index 9f10fedcfa14d780e0b9dd04ad19005c8065e2b2..bf50be59378ace81285887bd0c4325a13e4e53cf 100644 (file)
@@ -5,9 +5,11 @@
 #include <vector>
 #include <unordered_map>
 #include <limits>
+#include <cstddef>
 
 #include <unistd.h>
-#include <ev++.h>
+
+#include "dasync.h"
 
 #include "dinit-log.h"
 #include "control-cmds.h"
 
 // Control connection for dinit
 
+using namespace dasync;
+using EventLoop_t = EventLoop<NullMutex>;
+
 // TODO: Use the input buffer as a circular buffer, instead of chomping data from
 // the front using a data move.
 
-// forward-declaration of callback:
-static void control_conn_cb(struct ev_loop * loop, ev_io * w, int revents);
-
 class ControlConn;
+class ControlConnWatcher;
+
+// forward-declaration of callback:
+static void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
 
 // Pointer to the control connection that is listening for rollback completion
 extern ControlConn * rollback_handler_conn;
@@ -44,12 +50,32 @@ extern int active_control_conns;
 class ServiceSet;
 class ServiceRecord;
 
+class ControlConnWatcher : public PosixFdWatcher<NullMutex>
+{
+    Rearm gotEvent(EventLoop_t * loop, int fd, int flags) override
+    {
+        control_conn_cb(loop, this, flags);
+        return Rearm::REARM;
+    }
+    
+    public:
+    int fd; // TODO this is already stored, find a better way to access it.
+    
+    using PosixFdWatcher<NullMutex>::setWatchFlags;
+    
+    void registerWith(EventLoop_t *loop, int fd, int flags)
+    {
+        this->fd = fd;
+        PosixFdWatcher<NullMutex>::registerWith(loop, fd, flags);
+    }
+};
+
 class ControlConn : private ServiceListener
 {
-    friend void control_conn_cb(struct ev_loop *, ev_io *, int);
+    friend void control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
     
-    struct ev_io iob;
-    struct ev_loop *loop;
+    ControlConnWatcher iob;
+    EventLoop_t *loop;
     ServiceSet *service_set;
     
     bool bad_conn_close = false; // close when finished output?
@@ -122,7 +148,8 @@ class ControlConn : private ServiceListener
     {
         bad_conn_close = true;
         oom_close = true;
-        ev_io_set(&iob, iob.fd, EV_WRITE);
+        iob.setWatchFlags(out_events);
+        //ev_io_set(&iob, iob.fd, EV_WRITE);
     }
     
     // Process service event broadcast.
@@ -155,11 +182,12 @@ class ControlConn : private ServiceListener
     }
     
     public:
-    ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
+    ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
     {
-        ev_io_init(&iob, control_conn_cb, fd, EV_READ);
-        iob.data = this;
-        ev_io_start(loop, &iob);
+        //ev_io_init(&iob, control_conn_cb, fd, EV_READ);
+        //iob.data = this;
+        //ev_io_start(loop, &iob);
+        iob.registerWith(loop, fd, in_events);
         
         active_control_conns++;
     }
@@ -170,16 +198,17 @@ class ControlConn : private ServiceListener
 };
 
 
-static void control_conn_cb(struct ev_loop * loop, ev_io * w, int revents)
+static void control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
 {
-    ControlConn *conn = (ControlConn *) w->data;
-    if (revents & EV_READ) {
+    char * cc_addr = (reinterpret_cast<char *>(watcher)) - offsetof(ControlConn, iob);
+    ControlConn *conn = reinterpret_cast<ControlConn *>(cc_addr);
+    if (revents & in_events) {
         if (conn->dataReady()) {
             // ControlConn was deleted
             return;
         }
     }
-    if (revents & EV_WRITE) {
+    if (revents & out_events) {
         conn->sendData();
     }    
 }
diff --git a/src/dasync/dasync-aen.h b/src/dasync/dasync-aen.h
new file mode 100644 (file)
index 0000000..8dae802
--- /dev/null
@@ -0,0 +1,440 @@
+#include <system_error>
+#include <mutex>
+#include <type_traits>
+#include <unordered_map>
+#include <vector>
+
+#include <sys/epoll.h>
+#include <sys/signalfd.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include <unistd.h>
+#include <signal.h>
+
+namespace dasync {
+
+// Event type bits
+constexpr unsigned int in_events = 1;
+constexpr unsigned int out_events = 2;
+constexpr unsigned int err_events = 4;
+
+constexpr unsigned int one_shot = 8;
+
+
+template <class Base> class EpollLoop;
+
+class EpollTraits
+{
+    template <class Base> friend class EpollLoop;
+
+    public:
+
+    class SigInfo
+    {
+        template <class Base> friend class EpollLoop;
+        
+        struct signalfd_siginfo info;
+        
+        public:
+        int get_signo() { return info.ssi_signo; }
+        int get_sicode() { return info.ssi_code; }
+        int get_siint() { return info.ssi_int; }
+        int get_ssiptr() { return info.ssi_ptr; }
+        int get_ssiaddr() { return info.ssi_addr; }
+        
+        void set_signo(int signo) { info.ssi_signo = signo; }
+    };    
+
+    class FD_r;
+
+    // File descriptor optional storage. If the mechanism can return the file descriptor, this
+    // class will be empty, otherwise it can hold a file descriptor.
+    class FD_s {
+        friend class FD_r;
+        
+        // Epoll doesn't return the file descriptor (it can, but it can't return both file
+        // descriptor and user data).
+        int fd;
+    };
+
+    // File descriptor reference (passed to event callback). If the mechanism can return the
+    // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
+    // must be stored in an FD_s instance.
+    class FD_r {
+        public:
+        int getFd(FD_s ss)
+        {
+            return ss.fd;
+        }
+    };
+};
+
+
+template <class Base> class EpollLoop : public Base
+{
+    int epfd; // epoll fd
+    int sigfd; // signalfd fd; -1 if not initialised
+    sigset_t sigmask;
+
+    std::unordered_map<int, void *> sigdataMap;
+
+    // Base contains:
+    //   lock - a lock that can be used to protect internal structure.
+    //          receive*() methods will be called with lock held.
+    //   receiveSignal(SigInfo &, user *) noexcept
+    //   receiveFdEvent(FD_r, user *, int flags) noexcept
+    
+    using SigInfo = EpollTraits::SigInfo;
+    using FD_r = typename EpollTraits::FD_r;
+    
+    void processEvents(epoll_event *events, int r)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        
+        for (int i = 0; i < r; i++) {
+            void * ptr = events[i].data.ptr;
+            
+            if (ptr == &sigfd) {
+                // Signal
+                SigInfo siginfo;
+                while (true) {
+                    int r = read(sigfd, &siginfo.info, sizeof(siginfo.info));
+                    if (r == -1) break;
+                    if (siginfo.get_signo() != SIGCHLD) {
+                        // TODO remove the special exception for SIGCHLD?
+                        sigdelset(&sigmask, siginfo.get_signo());
+                    }
+                    auto iter = sigdataMap.find(siginfo.get_signo());
+                    if (iter != sigdataMap.end()) {
+                        void *userdata = (*iter).second;
+                        Base::receiveSignal(siginfo, userdata);
+                    }
+                }
+                signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
+            }
+            else {
+                int flags = 0;
+                (events[i].events & EPOLLIN) && (flags |= in_events);
+                (events[i].events & EPOLLHUP) && (flags |= in_events);
+                (events[i].events & EPOLLOUT) && (flags |= out_events);
+                (events[i].events & EPOLLERR) && (flags |= err_events);
+                Base::receiveFdEvent(*this, FD_r(), ptr, flags);
+            }            
+        }
+    }
+    
+    public:
+    
+    /**
+     * EpollLoop constructor.
+     *
+     * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
+     */
+    EpollLoop() : sigfd(-1)
+    {
+        epfd = epoll_create1(EPOLL_CLOEXEC);
+        if (epfd == -1) {
+            throw std::system_error(errno, std::system_category());
+        }
+        sigemptyset(&sigmask);
+    }
+    
+    ~EpollLoop()
+    {
+        close(epfd);
+        if (sigfd != -1) {
+            close(sigfd);
+        }
+    }
+    
+    // flags:  in_events | out_events
+    void addFdWatch(int fd, void *userdata, int flags)
+    {
+        struct epoll_event epevent;
+        // epevent.data.fd = fd;
+        epevent.data.ptr = userdata;
+        epevent.events = 0;
+        
+        if (flags & one_shot) {
+            epevent.events = EPOLLONESHOT;
+        }
+        if (flags & in_events) {
+            epevent.events |= EPOLLIN;
+        }
+        if (flags & out_events) {
+            epevent.events |= EPOLLOUT;
+        }
+
+        if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) {
+            throw new std::system_error(errno, std::system_category());        
+        }
+    }
+    
+    void removeFdWatch(int fd)
+    {
+        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr);
+    }
+    
+    void removeFdWatch_nolock(int fd)
+    {
+        removeFdWatch(fd);
+    }
+    
+    // Note this will *replace* the old flags with the new, that is,
+    // it can enable *or disable* read/write events.
+    void enableFdWatch(int fd, void *userdata, int flags)
+    {
+        struct epoll_event epevent;
+        // epevent.data.fd = fd;
+        epevent.data.ptr = userdata;
+        epevent.events = 0;
+        
+        if (flags & one_shot) {
+            epevent.events = EPOLLONESHOT;
+        }
+        if (flags & in_events) {
+            epevent.events |= EPOLLIN;
+        }
+        if (flags & out_events) {
+            epevent.events |= EPOLLOUT;
+        }
+
+        if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) {
+            throw new std::system_error(errno, std::system_category());        
+        }
+    }
+    
+    void enableFdWatch_nolock(int fd, void *userdata, int flags)
+    {
+        enableFdWatch(fd, userdata, flags);
+    }
+    
+    void disableFdWatch(int fd)
+    {
+        struct epoll_event epevent;
+        // epevent.data.fd = fd;
+        epevent.data.ptr = nullptr;
+        epevent.events = 0;
+        
+        // Epoll documentation says that hangup will still be reported, need to check
+        // whether this is really the case. Suspect it is really only the case if
+        // EPOLLIN is set.
+        if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) {
+            throw new std::system_error(errno, std::system_category());        
+        }
+    }
+    
+    // Note signal should be masked before call.
+    void addSignalWatch(int signo, void *userdata)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+        sigdataMap[signo] = userdata;
+
+        // Modify the signal fd to watch the new signal
+        bool was_no_sigfd = (sigfd == -1);
+        sigaddset(&sigmask, signo);
+        sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
+        if (sigfd == -1) {
+            throw new std::system_error(errno, std::system_category());
+        }
+        
+        if (was_no_sigfd) {
+            // Add the signalfd to the epoll set.
+            struct epoll_event epevent;
+            epevent.data.ptr = &sigfd;
+            epevent.events = EPOLLIN;
+            // No need for EPOLLONESHOT - we can pull the signals out
+            // as we see them.
+            if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) {
+                close(sigfd);
+                throw new std::system_error(errno, std::system_category());        
+            }
+        }
+    }
+    
+    // Note, called with lock held:
+    void rearmSignalWatch_nolock(int signo) noexcept
+    {
+        sigaddset(&sigmask, signo);
+        signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC);
+    }
+    
+    void removeSignalWatch_nolock(int signo) noexcept
+    {
+        sigdelset(&sigmask, signo);
+        signalfd(sigfd, &sigmask, 0);
+    }
+
+    void removeSignalWatch(int signo) noexcept
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        removeSignalWatch_nolock(signo);
+    }
+    
+    // If events are pending, process an unspecified number of them.
+    // If no events are pending, wait until one event is received and
+    // process this event (and possibly any other events received
+    // simultaneously).
+    // If processing an event removes a watch, there is a possibility
+    // that the watched event will still be reported (if it has
+    // occurred) before pullEvents() returns.
+    //
+    //  do_wait - if false, returns immediately if no events are
+    //            pending.
+    void pullEvents(bool do_wait)
+    {
+        epoll_event events[16];
+        int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0);
+        if (r == -1 || r == 0) {
+            // signal or no events
+            return;
+        }
+    
+        processEvents(events, r);
+    }
+
+    // If events are pending, process one of them.
+    // If no events are pending, wait until one event is received and
+    // process this event.
+    //
+    //  do_wait - if false, returns immediately if no events are
+    //            pending.    
+    void pullOneEvent(bool do_wait)
+    {
+        epoll_event events[1];
+        int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0);
+        if (r == -1 || r == 0) {
+            // signal or no events
+            return;
+        }
+    
+        processEvents(events, r);    
+    }
+    
+    // Interrupt any current poll operation (pullEvents/pullOneEvent), causing
+    // it to to return immediately.
+    void interruptWait()
+    {
+        // TODO
+    }
+};
+
+// Map of pid_t to void *, with possibility of reserving entries so that mappings can
+// be later added with no danger of allocator exhaustion (bad_alloc).
+class pid_map
+{
+    using pair = std::pair<pid_t, void *>;
+    std::unordered_map<pid_t, void *> base_map;
+    std::vector<pair> backup_vector;
+    
+    // Number of entries in backup_vector that are actually in use (as opposed
+    // to simply reserved):
+    int backup_size = 0;
+    
+    public:
+    using entry = std::pair<bool, void *>;
+
+    entry get(pid_t key) noexcept
+    {
+        auto it = base_map.find(key);
+        if (it == base_map.end()) {
+            // Not in map; look in vector
+            for (int i = 0; i < backup_size; i++) {
+                if (backup_vector[i].first == key) {
+                    return entry(true, backup_vector[i].second);
+                }
+            }
+        
+            return entry(false, nullptr);
+        }
+        
+        return entry(true, it->second);
+    }
+    
+    entry erase(pid_t key) noexcept
+    {
+        auto iter = base_map.find(key);
+        if (iter != base_map.end()) {
+            entry r(true, iter->second);
+            base_map.erase(iter);
+            return r;
+        }
+        for (int i = 0; i < backup_size; i++) {
+            if (backup_vector[i].first == key) {
+                entry r(true, backup_vector[i].second);
+                backup_vector.erase(backup_vector.begin() + i);
+                return r;
+            }
+        }
+        return entry(false, nullptr);
+    }
+    
+    // Throws bad_alloc on reservation failure
+    void reserve()
+    {
+        backup_vector.resize(backup_vector.size() + 1);
+    }
+    
+    void add(pid_t key, void *val) // throws std::bad_alloc
+    {
+        base_map[key] = val;
+    }
+    
+    void add_from_reserve(pid_t key, void *val) noexcept
+    {
+        try {
+            base_map[key] = val;
+            backup_vector.resize(backup_vector.size() - 1);
+        }
+        catch (std::bad_alloc &) {
+            // We couldn't add into the map, use the reserve:
+            backup_vector[backup_size++] = pair(key, val);
+        }
+    }
+};
+
+template <class Base> class ChildProcEvents : public Base
+{
+    private:
+    pid_map child_waiters;
+
+    using SigInfo = typename Base::SigInfo;
+    
+    protected:
+    void receiveSignal(SigInfo &siginfo, void *userdata)
+    {
+        if (siginfo.get_signo() == SIGCHLD) {
+            int status;
+            pid_t child;
+            while ((child = waitpid(-1, &status, WNOHANG)) > 0) {
+                pid_map::entry ent = child_waiters.erase(child);
+                if (ent.first) {
+                    Base::receiveChildStat(child, status, ent.second);
+                }
+            }
+        }
+        else {
+            Base::receiveSignal(siginfo, userdata);
+        }
+    }
+    
+    public:
+    void reserveChildWatch()
+    {
+        child_waiters.reserve();
+    }
+    
+    void addChildWatch(pid_t child, void *val)
+    {
+        child_waiters.add(child, val);
+    }
+    
+    void addReservedChildWatch(pid_t child, void *val) noexcept
+    {
+        child_waiters.add_from_reserve(child, val);
+    }
+};
+
+} // end namespace
diff --git a/src/dasync/dasync.h b/src/dasync/dasync.h
new file mode 100644 (file)
index 0000000..d9008d7
--- /dev/null
@@ -0,0 +1,858 @@
+#ifndef DASYNC_H_INCLUDED
+#define DASYNC_H_INCLUDED
+
+#include "dasync-aen.h"
+
+#include <atomic>
+#include <condition_variable>
+#include <cstdint>
+#include <cstddef>
+
+#include "dmutex.h"
+
+
+
+// TODO consider using atomic variables instead of explicit locking where appropriate
+
+// Allow optimisation of empty classes by including this in the body:
+// May be included as the last entry for a class which is only
+// _potentially_ empty.
+
+/*
+#ifdef __GNUC__
+#ifdef __clang__
+#define EMPTY_BODY private: char empty_fill[0];
+#else
+#define EMPTY_BODY private: char empty_fill[0];
+#endif
+#else
+#define EMPTY_BODY
+#endif
+*/
+
+namespace dasync {
+
+
+/**
+ * Values for rearm/disarm return from event handlers
+ */
+enum class Rearm
+{
+    /** Re-arm the event watcher so that it receives further events */
+    REARM,
+    /** Disarm the event watcher so that it receives no further events, until it is re-armed explicitly */
+    DISARM,
+    /** Remove the event watcher (and call "removed" callback) */
+    REMOVE,
+    /** Leave in current state */
+    NOOP
+// TODO: add a REMOVED option, which means, "I removed myself, DON'T TOUCH ME"
+};
+
+
+// Forward declarations:
+template <typename T_Mutex> class EventLoop;
+template <typename T_Mutex> class PosixFdWatcher;
+template <typename T_Mutex> class PosixSignalWatcher;
+template <typename T_Mutex> class PosixChildWatcher;
+
+// Information about a received signal.
+// This is essentially a wrapper for the POSIX siginfo_t; its existence allows for mechanisms that receive
+// equivalent signal information in a different format (eg signalfd on Linux).
+using SigInfo = EpollTraits::SigInfo;
+
+namespace dprivate {
+    // (non-public API)
+
+    enum class WatchType
+    {
+        SIGNAL,
+        FD,
+        CHILD,
+        SECONDARYFD
+    };
+    
+    template <typename T_Mutex, typename Traits> class EventDispatch;
+
+    // For FD watchers:
+    // Use this watch flag to indicate that in and out events should be reported separately,
+    // that is, watcher should not be disabled until all watched event types are queued.
+    constexpr static int multi_watch = 4;
+    
+    // Represents a queued event notification
+    class BaseWatcher
+    {
+        template <typename T_Mutex, typename Traits> friend class EventDispatch;
+        template <typename T_Mutex> friend class dasync::EventLoop;
+        
+        protected:
+        WatchType watchType;
+        int active : 1;
+        int deleteme : 1;
+        
+        BaseWatcher * next;
+        
+        public:
+        BaseWatcher(WatchType wt) noexcept : watchType(wt), active(0), deleteme(0), next(nullptr) { }
+        
+        virtual ~BaseWatcher() noexcept { }
+        
+        // Called when the watcher has been removed.
+        // It is guaranteed by the caller that:
+        // - the dispatch method is not currently running
+        // - the dispatch method will not be called.
+        virtual void watchRemoved() noexcept
+        {
+            // TODO this "delete" behaviour could be dependent on a flag, perhaps?
+            // delete this;
+        }
+    };
+    
+    // Base signal event - not part of public API
+    template <typename T_Mutex>
+    class BaseSignalWatcher : public BaseWatcher
+    {
+        template <typename M, typename Traits> friend class EventDispatch;
+        friend class dasync::EventLoop<T_Mutex>;
+
+        protected:
+        SigInfo siginfo;
+        BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { }
+
+        public:
+        typedef SigInfo &SigInfo_p;
+        
+        virtual Rearm gotSignal(EventLoop<T_Mutex> * eloop, int signo, SigInfo_p siginfo) = 0;
+    };
+    
+    template <typename T_Mutex>
+    class BaseFdWatcher : public BaseWatcher
+    {
+        template <typename, typename Traits> friend class EventDispatch;
+        friend class dasync::EventLoop<T_Mutex>;
+        
+        protected:
+        int watch_fd;
+        int watch_flags;
+        int event_flags;
+        
+        BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }
+        
+        public:
+        virtual Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) = 0;
+    };
+    
+    template <typename T_Mutex>
+    class BaseBidiFdWatcher : public BaseFdWatcher<T_Mutex>
+    {
+        template <typename, typename Traits> friend class EventDispatch;
+        friend class dasync::EventLoop<T_Mutex>;
+        
+        // The main instance is the "input" watcher only; we keep a secondary watcher
+        // with a secondary set of flags for the "output" watcher:
+        BaseWatcher outWatcher = BaseWatcher(WatchType::SECONDARYFD);
+        
+        // This should never actually get called:        
+        Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags)
+        {
+            return Rearm::REARM; // should not be reachable.
+        };
+        
+        protected:
+        int read_removed : 1; // read watch removed?
+        int write_removed : 1; // write watch removed?
+        
+        public:
+        virtual Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) = 0;
+        virtual Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) = 0;
+    };
+    
+    template <typename T_Mutex>
+    class BaseChildWatcher : public BaseWatcher
+    {
+        template <typename, typename Traits> friend class EventDispatch;
+        friend class dasync::EventLoop<T_Mutex>;
+        
+        protected:
+        pid_t watch_pid;
+        int child_status;
+        
+        BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { }
+        
+        public:
+        virtual void gotTermStat(EventLoop<T_Mutex> * eloop, pid_t child, int status) = 0;
+    };
+
+    // Classes for implementing a fair(ish) wait queue.
+    // A queue node can be signalled when it reaches the head of
+    // the queue.
+
+    template <typename T_Mutex> class waitqueue;
+    template <typename T_Mutex> class waitqueue_node;
+
+    // Select an appropriate conditiona variable type for a mutex:
+    // condition_variable if mutex is std::mutex, or condition_variable_any
+    // otherwise.
+    template <class T_Mutex> class condvarSelector;
+
+    template <> class condvarSelector<std::mutex>
+    {
+        public:
+        typedef std::condition_variable condvar;
+    };
+
+    template <class T_Mutex> class condvarSelector
+    {
+        public:
+        typedef std::condition_variable_any condvar;
+    };
+
+    template <> class waitqueue_node<NullMutex>
+    {
+        // Specialised waitqueue_node for NullMutex.
+        // TODO can this be reduced to 0 data members?
+        friend class waitqueue<NullMutex>;
+        waitqueue_node * next = nullptr;
+        
+        public:
+        void wait(std::unique_lock<NullMutex> &ul) { }
+        void signal() { }
+    };
+
+    template <typename T_Mutex> class waitqueue_node
+    {
+        typename condvarSelector<T_Mutex>::condvar condvar;
+        friend class waitqueue<T_Mutex>;
+        waitqueue_node * next = nullptr;
+        
+        public:
+        void signal()
+        {
+            condvar.notify_one();
+        }
+        
+        void wait(std::unique_lock<T_Mutex> &mutex_lock)
+        {
+            condvar.wait(mutex_lock);
+        }
+    };
+
+    template <typename T_Mutex> class waitqueue
+    {
+        waitqueue_node<T_Mutex> * tail = nullptr;
+        waitqueue_node<T_Mutex> * head = nullptr;
+
+        public:
+        waitqueue_node<T_Mutex> * unqueue()
+        {
+            head = head->next;
+            return head;
+        }
+        
+        waitqueue_node<T_Mutex> * getHead()
+        {
+            return head;
+        }
+        
+        void queue(waitqueue_node<T_Mutex> *node)
+        {
+            if (tail) {
+                tail->next = node;
+            }
+            else {
+                head = node;
+            }
+        }
+    };
+
+    // This class serves as the base class (mixin) for the AEN mechanism class.
+    // Note that EventDispatch, here, and EventLoop (below) are really two sides of one coin;
+    // they do not work independently. The mixin pattern that we use to avoid dynamic dispatch
+    // forces them to be two seperate classes, however.
+    //
+    // The EventDispatch class maintains the queued event data structures. It inserts watchers
+    // into the queue when eventes are received (receiveXXX methods).
+    template <typename T_Mutex, typename Traits> class EventDispatch : public Traits
+    {
+        friend class EventLoop<T_Mutex>;
+
+        // queue data structure/pointer
+        BaseWatcher * first;
+        
+        using BaseSignalWatcher = dasync::dprivate::BaseSignalWatcher<T_Mutex>;
+        using BaseFdWatcher = dasync::dprivate::BaseFdWatcher<T_Mutex>;
+        using BaseBidiFdWatcher = dasync::dprivate::BaseBidiFdWatcher<T_Mutex>;
+        using BaseChildWatcher = dasync::dprivate::BaseChildWatcher<T_Mutex>;
+        
+        void queueWatcher(BaseWatcher *bwatcher)
+        {
+            // TODO 
+            // We can't allow a queued entry to be deleted (due to the single-linked-list used for the queue)
+            // so for now, I'll set it active; but this prevents it being deleted until we can next
+            // process events, so once we have a proper linked list or better structure should probably
+            // remove this:
+            bwatcher->active = true;
+            
+            // Put in queue:
+            BaseWatcher * prev_first = first;
+            first = bwatcher;
+            bwatcher->next = prev_first;
+        }
+
+        protected:
+        T_Mutex lock;
+        
+        void receiveSignal(typename Traits::SigInfo & siginfo, void * userdata)
+        {
+            BaseSignalWatcher * bwatcher = static_cast<BaseSignalWatcher *>(userdata);
+            bwatcher->siginfo = siginfo;
+            queueWatcher(bwatcher);
+        }
+        
+        template <typename T>
+        void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags)
+        {
+            BaseFdWatcher * bfdw = static_cast<BaseFdWatcher *>(userdata);
+            
+            bfdw->event_flags |= flags;
+            
+            BaseWatcher * bwatcher = bfdw;
+            
+            bool is_multi_watch = bfdw->watch_flags & multi_watch;
+            if (is_multi_watch) {                
+                BaseBidiFdWatcher *bbdw = static_cast<BaseBidiFdWatcher *>(bwatcher);
+                if (flags & in_events && flags & out_events) {
+                    // Queue the secondary watcher first:
+                    queueWatcher(&bbdw->outWatcher);
+                }
+                else if (flags & out_events) {                
+                    // Use the secondary watcher for queueing:
+                    bwatcher = &(bbdw->outWatcher);
+                }
+            }
+
+            queueWatcher(bwatcher);
+            
+            if (is_multi_watch && bfdw->event_flags != bfdw->watch_flags) {
+                // We need to re-enable the other channel now:
+                loop_mech.enableFdWatch_nolock(bfdw->watch_fd, userdata,
+                    (bfdw->watch_flags & ~(bfdw->event_flags)) | one_shot);
+            }
+        }
+        
+        void receiveChildStat(pid_t child, int status, void * userdata)
+        {
+            BaseChildWatcher * watcher = static_cast<BaseChildWatcher *>(userdata);
+            watcher->child_status = status;
+            queueWatcher(watcher);
+        }
+        
+        // TODO is this needed?:
+        BaseWatcher * pullEvent()
+        {
+            if (first) {
+                BaseWatcher * r = first;
+                first = first->next;
+                return r;
+            }
+            return nullptr;
+        }
+        
+        void issueDelete(BaseWatcher *watcher) noexcept
+        {
+            // This is only called when the attention lock is held, so if the watcher is not
+            // active/queued now, it cannot become active during execution of this function.
+            
+            lock.lock();
+            
+            if (watcher->active) {
+                // If the watcher is active, set deleteme true; the watcher will be removed
+                // at the end of current processing (i.e. when active is set false).
+                watcher->deleteme = true;
+            }
+            else {
+                // Actually do the delete.
+                watcher->watchRemoved();
+            }
+            
+            lock.unlock();
+        }
+    };
+}
+
+
+template <typename T_Mutex> class EventLoop
+{
+    friend class PosixFdWatcher<T_Mutex>;
+    friend class PosixSignalWatcher<T_Mutex>;
+    friend class PosixChildWatcher<T_Mutex>;
+    
+    template <typename T, typename U> using EventDispatch = dprivate::EventDispatch<T,U>;
+    template <typename T> using waitqueue = dprivate::waitqueue<T>;
+    template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
+    using BaseWatcher = dprivate::BaseWatcher;
+    using BaseSignalWatcher = dprivate::BaseSignalWatcher<T_Mutex>;
+    using BaseFdWatcher = dprivate::BaseFdWatcher<T_Mutex>;
+    using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher<T_Mutex>;
+    using BaseChildWatcher = dprivate::BaseChildWatcher<T_Mutex>;
+    using WatchType = dprivate::WatchType;
+    
+    EpollLoop<ChildProcEvents<EventDispatch<T_Mutex, EpollTraits>>> loop_mech;
+
+    // There is a complex problem with most asynchronous event notification mechanisms
+    // when used in a multi-threaded environment. Generally, a file descriptor or other
+    // event type that we are watching will be associated with some data used to manage
+    // that event source. For example a web server needs to maintain information about
+    // each client connection, such as the state of the connection (what protocol version
+    // has been negotiated, etc; if a transfer is taking place, what file is being
+    // transferred etc).
+    //
+    // However, sometimes we want to remove an event source (eg webserver wants to drop
+    // a connection) and delete the associated data. The problem here is that it is
+    // difficult to be sure when it is ok to actually remove the data, since when
+    // requesting to unwatch the source in one thread it is still possible that an
+    // event from that source is just being reported to another thread (in which case
+    // the data will be needed).
+    //
+    // To solve that, we:
+    // - allow only one thread to poll for events at a time, using a lock
+    // - use the same lock to prevent polling, if we want to unwatch an event source
+    // - generate an event to interrupt any polling that may already be occurring in
+    //   another thread
+    // - mark handlers as active if they are currently executing, and
+    // - when removing an active handler, simply set a flag which causes it to be
+    //   removed once the current processing is finished, rather than removing it
+    //   immediately.
+    //
+    // In particular the lock mechanism for preventing multiple threads polling and
+    // for allowing polling to be interrupted is tricky. We can't use a simple mutex
+    // since there is significant chance that it will be highly contended and there
+    // are no guarantees that its acquisition will be fair. In particular, we don't
+    // want a thread that is trying to unwatch a source being starved while another
+    // thread polls the event source.
+    //
+    // So, we use two wait queues protected by a single mutex. The "attn_waitqueue"
+    // (attention queue) is the high-priority queue, used for threads wanting to
+    // unwatch event sources. The "wait_waitquueue" is the queue used by threads
+    // that wish to actually poll for events.
+    // - The head of the "attn_waitqueue" is always the holder of the lock
+    // - Therefore, a poll-waiter must be moved from the wait_waitqueue to the
+    //   attn_waitqueue to actually gain the lock. This is only done if the
+    //   attn_waitqueue is otherwise empty.
+    // - The mutex only protects manipulation of the wait queues, and so should not
+    //   be highly contended.
+    
+    T_Mutex wait_lock;  // wait lock, used to prevent multiple threads from waiting
+                        // on the event queue simultaneously.
+    waitqueue<T_Mutex> attn_waitqueue;
+    waitqueue<T_Mutex> wait_waitqueue;
+    
+    
+    void registerSignal(BaseSignalWatcher *callBack, int signo)
+    {
+        loop_mech.addSignalWatch(signo, callBack);
+    }
+    
+    void deregister(BaseSignalWatcher *callBack, int signo) noexcept
+    {
+        loop_mech.removeSignalWatch(signo);
+        
+        waitqueue_node<T_Mutex> qnode;
+        getAttnLock(qnode);        
+        
+        EventDispatch<T_Mutex, EpollTraits> & ed = (EventDispatch<T_Mutex, EpollTraits> &) loop_mech;
+        ed.issueDelete(callBack);
+        
+        releaseLock(qnode);
+    }
+
+    void registerFd(BaseFdWatcher *callback, int fd, int eventmask)
+    {
+        loop_mech.addFdWatch(fd, callback, eventmask);
+    }
+    
+    void deregister(BaseFdWatcher *callback, int fd)
+    {
+        loop_mech.removeFdWatch(fd);
+        
+        waitqueue_node<T_Mutex> qnode;
+        getAttnLock(qnode);        
+        
+        EventDispatch<T_Mutex, EpollTraits> & ed = (EventDispatch<T_Mutex, EpollTraits> &) loop_mech;
+        ed.issueDelete(callback);
+        
+        releaseLock(qnode);        
+    }
+    
+    void reserveChildWatch(BaseChildWatcher *callBack)
+    {
+        loop_mech.addSignalWatch(SIGCHLD, nullptr); // TODO remove this kludge
+        loop_mech.reserveChildWatch();
+    }
+    
+    void registerChild(BaseChildWatcher *callBack, pid_t child)
+    {
+        loop_mech.addSignalWatch(SIGCHLD, nullptr); // TODO remove this kludge
+        loop_mech.addChildWatch(child, callBack);
+    }
+    
+    void registerReservedChild(BaseChildWatcher *callBack, pid_t child) noexcept
+    {
+        loop_mech.addReservedChildWatch(child, callBack);
+    }
+
+    // Acquire the attention lock (when held, ensures that no thread is polling the AEN
+    // mechanism).
+    void getAttnLock(waitqueue_node<T_Mutex> &qnode)
+    {
+        std::unique_lock<T_Mutex> ulock(wait_lock);
+        attn_waitqueue.queue(&qnode);        
+        if (attn_waitqueue.getHead() != &qnode) {
+            loop_mech.interruptWait();
+            while (attn_waitqueue.getHead() != &qnode) {
+                qnode.wait(ulock);
+            }
+        }
+    }
+    
+    // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower
+    // priority than the attention lock).
+    void getPollwaitLock(waitqueue_node<T_Mutex> &qnode)
+    {
+        std::unique_lock<T_Mutex> ulock(wait_lock);
+        if (attn_waitqueue.getHead() == nullptr) {
+            // Queue is completely empty:
+            attn_waitqueue.queue(&qnode);
+        }
+        else {
+            wait_waitqueue.queue(&qnode);
+        }
+        
+        while (attn_waitqueue.getHead() != &qnode) {
+            qnode.wait(ulock);
+        }    
+    }
+    
+    // Release the poll-wait/attention lock.
+    void releaseLock(waitqueue_node<T_Mutex> &qnode)
+    {
+        std::unique_lock<T_Mutex> ulock(wait_lock);
+        waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
+        if (nhead != nullptr) {
+            nhead->signal();
+        }
+        else {
+            nhead = wait_waitqueue.getHead();
+            if (nhead != nullptr) {
+                attn_waitqueue.queue(nhead);
+                nhead->signal();
+            }
+        }                
+    }
+    
+    void processSignalRearm(BaseSignalWatcher * bsw, Rearm rearmType)
+    {
+        // Called with lock held
+        if (rearmType == Rearm::REARM) {
+            loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo());
+        }
+        else if (rearmType == Rearm::REMOVE) {
+            loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo());
+        }
+    }
+
+    Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch)
+    {
+        // Called with lock held
+        if (is_multi_watch) {
+            BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
+            
+            if (rearmType == Rearm::REMOVE) {
+                bdfw->read_removed = 1;
+                bdfw->watch_flags &= ~in_events;
+                if (! bdfw->write_removed) {
+                    return Rearm::NOOP;
+                }
+                else {
+                    // both removed: actually remove
+                    loop_mech.removeFdWatch_nolock(bdfw->watch_fd);
+                }
+            }
+            else if (rearmType == Rearm::DISARM) {
+                // Nothing more to do
+            }
+            else if (rearmType == Rearm::REARM) {
+                bdfw->watch_flags |= in_events;
+                loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+                        static_cast<BaseWatcher *>(bdfw),
+                        (bdfw->watch_flags & (in_events | out_events)) | one_shot);
+            }
+            return rearmType;
+        }
+        else {
+            if (rearmType == Rearm::REARM) {
+                loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw,
+                        (bfw->watch_flags & (in_events | out_events)) | one_shot);
+            }
+            else if (rearmType == Rearm::REMOVE) {
+                loop_mech.removeFdWatch_nolock(bfw->watch_fd);
+            }
+            return rearmType;
+        }
+    }
+
+    Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType)
+    {
+        // Called with lock held
+        if (rearmType == Rearm::REMOVE) {
+            bdfw->write_removed = 1;
+            bdfw->watch_flags &= ~out_events;
+            if (! bdfw->read_removed) {
+                return Rearm::NOOP;
+            }
+            else {
+                // both removed: actually remove
+                loop_mech.removeFdWatch_nolock(bdfw->watch_fd);
+            }
+        }
+        else if (rearmType == Rearm::DISARM) {
+            // Nothing more to do
+        }
+        else if (rearmType == Rearm::REARM) {
+            bdfw->watch_flags |= out_events;
+            loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+                    static_cast<BaseWatcher *>(bdfw),
+                    (bdfw->watch_flags & (in_events | out_events)) | one_shot);
+        }
+        return rearmType;
+    }
+
+    bool processEvents() noexcept
+    {
+        EventDispatch<T_Mutex, EpollTraits> & ed = (EventDispatch<T_Mutex, EpollTraits> &) loop_mech;
+        ed.lock.lock();
+        
+        // So this pulls *all* currently pending events and processes them in the current thread.
+        // That's probably good for throughput, but maybe the behavior should be configurable.
+        
+        BaseWatcher * pqueue = ed.first;
+        ed.first = nullptr;
+        bool active = false;
+        
+        BaseWatcher * prev = nullptr;
+        for (BaseWatcher * q = pqueue; q != nullptr; q = q->next) {
+            if (q->deleteme) {
+                q->watchRemoved();
+                if (prev) {
+                    prev->next = q->next;
+                }
+                else {
+                    pqueue = q->next;
+                }
+            }
+            else {
+                q->active = true;
+                active = true;
+            }
+        }
+        
+        ed.lock.unlock();
+        
+        while (pqueue != nullptr) {
+            Rearm rearmType = Rearm::NOOP;
+            bool is_multi_watch = false;
+            BaseBidiFdWatcher *bbfw = nullptr;
+            // (Above variables are initialised only to silence compiler warnings).
+            
+            // Note that we select actions based on the type of the watch, as determined by the watchType
+            // member. In some ways this screams out for polmorphism; a virtual function could be overridden
+            // by each of the watcher types. I've instead used switch/case because I think it will perform
+            // slightly better without the overhead of a virtual function dispatch, but it's got to be a
+            // close call; I might be guilty of premature optimisation here.
+            
+            switch (pqueue->watchType) {
+            case WatchType::SIGNAL: {
+                BaseSignalWatcher *bsw = static_cast<BaseSignalWatcher *>(pqueue);
+                rearmType = bsw->gotSignal(this, bsw->siginfo.get_signo(), bsw->siginfo);
+                break;
+            }
+            case WatchType::FD: {
+                BaseFdWatcher *bfw = static_cast<BaseFdWatcher *>(pqueue);
+                is_multi_watch = bfw->watch_flags & dprivate::multi_watch;
+                if (is_multi_watch) {
+                    // The primary watcher for a multi-watch watcher is queued for
+                    // read events.
+                    BaseBidiFdWatcher *bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
+                    rearmType = bdfw->readReady(this, bfw->watch_fd);
+                }
+                else {
+                    rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags);
+                }
+                break;
+            }
+            case WatchType::CHILD: {
+                BaseChildWatcher *bcw = static_cast<BaseChildWatcher *>(pqueue);
+                bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status);
+                // Child watches automatically remove:
+                rearmType = Rearm::REMOVE;
+                break;
+            }
+            case WatchType::SECONDARYFD: {
+                // first construct a pointer to the main watcher:
+                char * rp = (char *)pqueue;
+                rp -= offsetof(BaseBidiFdWatcher, outWatcher);
+                bbfw = (BaseBidiFdWatcher *)rp;
+                rearmType = bbfw->writeReady(this, bbfw->watch_fd);
+                break;
+            }
+            default: ;
+            }
+            
+            ed.lock.lock();
+            
+            pqueue->active = false;
+            if (pqueue->deleteme) {
+                // We don't want a watch that is marked "deleteme" to re-arm itself.
+                // NOOP flags that the state is managed externally, so we don't adjust that.
+                if (rearmType != Rearm::NOOP) {
+                    rearmType = Rearm::REMOVE;
+                }
+            }
+            switch (pqueue->watchType) {
+            case WatchType::SIGNAL:
+                processSignalRearm(static_cast<BaseSignalWatcher *>(pqueue), rearmType);
+                break;
+            case WatchType::FD:
+                rearmType = processFdRearm(static_cast<BaseFdWatcher *>(pqueue), rearmType, is_multi_watch);
+                break;
+            case WatchType::SECONDARYFD:
+                processSecondaryRearm(bbfw, rearmType);
+                break;
+            default: ;
+            }
+            
+            if (pqueue->deleteme) rearmType = Rearm::REMOVE; // makes the watchRemoved() callback get called.
+            
+            ed.lock.unlock();
+            
+            if (rearmType == Rearm::REMOVE) {
+                pqueue->watchRemoved();
+            }
+            
+            pqueue = pqueue->next;
+        }
+        
+        return active;
+    }
+
+    
+    public:
+    void run() noexcept
+    {
+        while (! processEvents()) {
+            waitqueue_node<T_Mutex> qnode;
+            
+            // We only allow one thread to poll the mechanism at any time, since otherwise
+            // removing event watchers is a nightmare beyond comprehension.
+            getPollwaitLock(qnode);
+            
+            // Pull events from the AEN mechanism and insert them in our internal queue:
+            loop_mech.pullEvents(true);
+            
+            // Now release the wait lock:
+            releaseLock(qnode);
+        }
+    }
+};
+
+
+typedef EventLoop<NullMutex> NEventLoop;
+typedef EventLoop<std::mutex> TEventLoop;
+
+// from dasync.cc:
+TEventLoop & getSystemLoop();
+
+// Posix signal event watcher
+template <typename T_Mutex>
+class PosixSignalWatcher : private dprivate::BaseSignalWatcher<T_Mutex>
+{
+public:
+    using SigInfo_p = typename dprivate::BaseSignalWatcher<T_Mutex>::SigInfo_p;
+
+    // Register this watcher to watch the specified signal.
+    // If an attempt is made to register with more than one event loop at
+    // a time, behaviour is undefined.
+    inline void registerWatch(EventLoop<T_Mutex> *eloop, int signo)
+    {
+        this->deleteme = false;
+        this->siginfo.set_signo(signo);
+        eloop->registerSignal(this, signo);
+    }
+    
+    inline void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
+    {
+        eloop->deregister(this, this->siginfo.get_signo());
+    }
+    
+    // virtual Rearm gotSignal(EventLoop<T_Mutex> *, int signo, SigInfo_p info) = 0;
+};
+
+// Posix file descriptor event watcher
+template <typename T_Mutex>
+class PosixFdWatcher : private dprivate::BaseFdWatcher<T_Mutex>
+{
+    protected:
+    
+    // Set the types of event to watch. May not be supported for all mechanisms.
+    // Only safe to call from within the callback handler (gotEvent).
+    void setWatchFlags(int newFlags)
+    {
+        this->watch_flags = newFlags;
+    }
+    
+    public:
+    
+    void registerWith(EventLoop<T_Mutex> *eloop, int fd, int flags)
+    {
+        this->deleteme = false;
+        this->watch_fd = fd;
+        this->watch_flags = flags;
+        eloop->registerFd(this, fd, flags);
+    }
+    
+    void deregisterWatch(EventLoop<T_Mutex> *eloop) noexcept
+    {
+        eloop->deregister(this, this->watch_fd);
+    }
+    
+    // virtual Rearm gotEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
+};
+
+// Posix child process event watcher
+template <typename T_Mutex>
+class PosixChildWatcher : private dprivate::BaseChildWatcher<T_Mutex>
+{
+    public:
+    void reserveWith(EventLoop<T_Mutex> *eloop)
+    {
+        eloop->reserveChildWatch();
+    }
+    
+    void registerWith(EventLoop<T_Mutex> *eloop, pid_t child)
+    {
+        this->deleteme = false;
+        this->watch_pid = child;
+        eloop->registerChild(this, child);
+    }
+    
+    void registerReserved(EventLoop<T_Mutex> *eloop, pid_t child) noexcept
+    {
+        eloop->registerReservedChild(this, child);
+    }
+    
+    // virtual void gotTermStat(EventLoop<T_Mutex> *, pid_t child, int status) = 0;
+};
+
+}  // namespace dasync
+
+#endif
diff --git a/src/dasync/dmutex.h b/src/dasync/dmutex.h
new file mode 100644 (file)
index 0000000..b219621
--- /dev/null
@@ -0,0 +1,61 @@
+#ifndef D_MUTEX_H_INCLUDED
+#define D_MUTEX_H_INCLUDED
+
+//#include <pthread.h>
+#include <mutex>
+
+namespace dasync {
+
+// Simple non-recursive mutex, with priority inheritance to avoid priority inversion.
+/*
+class DMutex
+{
+    private:
+    pthread_mutex_t mutex;
+    
+    public:
+    DMutex()
+    {
+        // Avoid priority inversion by using PTHREAD_PRIO_INHERIT
+        pthread_mutexattr_t attribs;
+        pthread_mutexattr_init(&attribs);
+        pthread_mutexattr_setprotocol(&attribs, PTHREAD_PRIO_INHERIT);
+        pthread_mutex_init(&mutex, &attribs);
+    }
+    
+    void lock()
+    {
+        pthread_mutex_lock(&mutex);
+    }
+    
+    void unlock()
+    {
+        pthread_mutex_unlock(&mutex);
+    }    
+};
+*/
+
+using DMutex = std::mutex;
+
+// A "null" mutex, for which locking / unlocking actually does nothing.
+class NullMutex
+{
+    #ifdef __GNUC__
+    #ifndef __clang__
+    char empty[0];  // Make class instances take up no space (gcc)    
+    #else
+    char empty[0] __attribute__((unused));  // Make class instances take up no space (clang)
+    #endif
+    #endif
+
+    public:
+    void lock() { }
+    void unlock() { }
+    void try_lock() { }
+};
+
+
+} // end of namespace
+
+
+#endif
index 03c96836fccd0c8c55548a329bc49692ea354f60..64ea5afdb3f4440051aa27480cf1bb01e6aa2a16 100644 (file)
@@ -1,14 +1,17 @@
 #include <iostream>
 #include <algorithm>
 
-#include <ev.h>
 #include <unistd.h>
 #include <fcntl.h>
 
+#include "dasync.h"
+
 #include "service.h"
 #include "dinit-log.h"
 #include "cpbuffer.h"
 
+extern EventLoop_t eventLoop;
+
 LogLevel log_level = LogLevel::WARN;
 LogLevel cons_log_level = LogLevel::WARN;
 static bool log_to_console = false;   // whether we should output log messages to
@@ -18,15 +21,19 @@ static bool log_current_line;  // Whether the current line is being logged
 static ServiceSet *service_set = nullptr;  // Reference to service set
 
 
-static void log_conn_callback(struct ev_loop *loop, struct ev_io *w, int revents) noexcept;
+namespace {
+    class BufferedLogStream;
+}
 
-class BufferedLogStream
+// TODO just make this the callback, directly.
+static Rearm log_conn_callback(EventLoop_t *loop, BufferedLogStream *w, int revents) noexcept;
+
+namespace {
+class BufferedLogStream : public PosixFdWatcher<NullMutex>
 {
     public:
     CPBuffer<4096> log_buffer;
     
-    struct ev_io eviocb;
-
     // Outgoing:
     bool partway = false;     // if we are partway throught output of a log message
     bool discarded = false;   // if we have discarded a message
@@ -40,13 +47,22 @@ class BufferedLogStream
     bool special = false;      // currently outputting special message?
     char *special_buf; // buffer containing special message
     int msg_index;     // index into special message
+    
+    int fd;
 
     void init(int fd)
     {
-        ev_io_init(&eviocb, log_conn_callback, fd, EV_WRITE);
-        eviocb.data = this;
+        //ev_io_init(&eviocb, log_conn_callback, fd, EV_WRITE);
+        //eviocb.data = this;
+        this->fd = fd;
+    }
+    
+    Rearm gotEvent(EventLoop_t *loop, int fd, int flags) noexcept override
+    {
+        return log_conn_callback(loop, this, flags);
     }
 };
+}
 
 // Two log streams:
 // (One for main log, one for console)
@@ -58,7 +74,8 @@ constexpr static int DLOG_CONS = 1; // console
 
 static void release_console()
 {
-    ev_io_stop(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+    //ev_io_stop(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+    //log_stream[DLOG_CONS].deregisterWatch(&eventLoop); // now handled elsewhere
     if (! log_to_console) {
         int flags = fcntl(1, F_GETFL, 0);
         fcntl(1, F_SETFL, flags & ~O_NONBLOCK);
@@ -66,9 +83,9 @@ static void release_console()
     }
 }
 
-static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noexcept
+static Rearm log_conn_callback(EventLoop_t * loop, BufferedLogStream * w, int revents) noexcept
 {
-    auto &log_stream = *static_cast<BufferedLogStream *>(w->data);
+    auto &log_stream = *w;
 
     if (log_stream.special) {
         char * start = log_stream.special_buf + log_stream.msg_index;
@@ -83,7 +100,7 @@ static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noe
             }
             else {
                 log_stream.msg_index += r;
-                return;
+                return Rearm::REARM;
             }
         }
         else {
@@ -91,7 +108,7 @@ static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noe
             // other error?
             // TODO
         }
-        return;
+        return Rearm::REARM;
     }
     else {
         // Writing from the regular circular buffer
@@ -100,7 +117,7 @@ static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noe
         
         if (log_stream.current_index == 0) {
             release_console();
-            return;
+            return Rearm::REMOVE;
         }
         
         char *ptr = log_stream.log_buffer.get_ptr(0);
@@ -127,6 +144,7 @@ static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noe
                 if (log_stream.current_index == 0 || !log_to_console) {
                     // No more messages buffered / stop logging to console:
                     release_console();
+                    return Rearm::REMOVE;
                 }
             }
         }
@@ -134,13 +152,13 @@ static void log_conn_callback(struct ev_loop * loop, ev_io * w, int revents) noe
             // TODO
             // EAGAIN / EWOULDBLOCK?
             // error?
-            return;
+            return Rearm::REARM;
         }
     }
     
     // We've written something by the time we get here. We could fall through to below, but
     // let's give other events a chance to be processed by returning now.
-    return;
+    return Rearm::REARM;
 }
 
 void init_log(ServiceSet *sset) noexcept
@@ -163,7 +181,8 @@ void enable_console_log(bool enable) noexcept
         //ev_io_init(& log_stream[DLOG_CONS].eviocb, log_conn_callback, 1, EV_WRITE);
         log_stream[DLOG_CONS].init(STDOUT_FILENO);
         if (log_stream[DLOG_CONS].current_index > 0) {
-            ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+            //ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+            log_stream[DLOG_CONS].registerWith(&eventLoop, log_stream[DLOG_CONS].fd, out_events);
         }
         log_to_console = true;
     }
@@ -173,10 +192,11 @@ void enable_console_log(bool enable) noexcept
             if (log_stream[DLOG_CONS].current_index > 0) {
                 // Try to flush any messages that are currently buffered. (Console is non-blocking
                 // so it will fail gracefully).
-                log_conn_callback(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb, EV_WRITE);
+                log_conn_callback(&eventLoop, &log_stream[DLOG_CONS], out_events);
             }
             else {
                 release_console();
+                log_stream[DLOG_CONS].deregisterWatch(&eventLoop);
             }
         }
         // (if we're partway through logging a message, we release the console when
@@ -218,7 +238,8 @@ template <typename ... T> static void do_log(T ... args) noexcept
         bool was_first = (log_stream[DLOG_CONS].current_index == 0);
         log_stream[DLOG_CONS].current_index += amount;
         if (was_first && log_to_console) {
-            ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+            //ev_io_start(ev_default_loop(EVFLAG_AUTO), & log_stream[DLOG_CONS].eviocb);
+            log_stream[DLOG_CONS].registerWith(&eventLoop, log_stream[DLOG_CONS].fd, out_events);
         }
     }
     else {
index e540b84785e9281ac2c5e6724738e3b1a293593f..a0e7a5f7f45aadd974422ffc62801872d51b6c15 100644 (file)
@@ -13,8 +13,8 @@
 #include <fcntl.h>
 #include <pwd.h>
 
+#include "dasync.h"
 #include "service.h"
-#include "ev++.h"
 #include "control.h"
 #include "dinit-log.h"
 
  */
 
 
-static void sigint_reboot_cb(struct ev_loop *loop, ev_signal *w, int revents);
-static void sigquit_cb(struct ev_loop *loop, ev_signal *w, int revents);
-static void sigterm_cb(struct ev_loop *loop, ev_signal *w, int revents);
-void open_control_socket(struct ev_loop *loop) noexcept;
-void close_control_socket(struct ev_loop *loop) noexcept;
+using namespace dasync;
+using EventLoop_t = EventLoop<NullMutex>;
 
-struct ev_io control_socket_io;
+EventLoop_t eventLoop = EventLoop_t();
+
+// TODO remove:
+//static void sigint_reboot_cb(struct ev_loop *loop, ev_signal *w, int revents);
+//static void sigquit_cb(struct ev_loop *loop, ev_signal *w, int revents);
+//static void sigterm_cb(struct ev_loop *loop, ev_signal *w, int revents);
+static void sigint_reboot_cb(EventLoop_t *eloop) noexcept;
+static void sigquit_cb(EventLoop_t *eloop) noexcept;
+static void sigterm_cb(EventLoop_t *eloop) noexcept;
+void open_control_socket(EventLoop_t *loop) noexcept;
+void close_control_socket(EventLoop_t *loop) noexcept;
+
+static void control_socket_cb(EventLoop_t *loop, int fd);
+
+class ControlSocketWatcher : public PosixFdWatcher<NullMutex>
+{
+    Rearm gotEvent(EventLoop_t * loop, int fd, int flags)
+    {
+        control_socket_cb(loop, fd);
+        return Rearm::REARM;
+    }
+    
+    public:
+    // TODO the fd is already stored, must we really store it again...
+    int fd;
+    
+    void registerWith(EventLoop_t * loop, int fd, int flags)
+    {
+        this->fd = fd;
+        PosixFdWatcher<NullMutex>::registerWith(loop, fd, flags);
+    }
+};
+
+ControlSocketWatcher control_socket_io;
 
 
 // Variables
@@ -103,6 +133,41 @@ const char * get_user_home()
 }
 
 
+namespace {
+    class CallbackSignalHandler : public PosixSignalWatcher<NullMutex>
+    {
+        public:
+        typedef void (*cb_func_t)(EventLoop_t *);
+        
+        private:
+        cb_func_t cb_func;
+        
+        public:
+        CallbackSignalHandler() : cb_func(nullptr) { }
+        CallbackSignalHandler(cb_func_t pcb_func) :  cb_func(pcb_func) { }
+        
+        void setCbFunc(cb_func_t cb_func)
+        {
+            this->cb_func = cb_func;
+        }
+        
+        Rearm gotSignal(EventLoop_t * eloop, int signo, SigInfo_p siginfo) override
+        {
+            service_set->stop_all_services(ShutdownType::REBOOT);
+            return Rearm::REARM;
+        }
+    };
+
+    class ControlSocketWatcher : public PosixFdWatcher<NullMutex>
+    {
+        Rearm gotEvent(EventLoop_t * loop, int fd, int flags)
+        {
+            control_socket_cb(loop, fd);
+            return Rearm::REARM;
+        }
+    };
+}
+
 int main(int argc, char **argv)
 {
     using namespace std;
@@ -193,11 +258,12 @@ int main(int argc, char **argv)
     
     /* Set up signal handlers etc */
     /* SIG_CHILD is ignored by default: good */
-    /* sigemptyset(&sigwait_set); */
-    /* sigaddset(&sigwait_set, SIGCHLD); */
-    /* sigaddset(&sigwait_set, SIGINT); */
-    /* sigaddset(&sigwait_set, SIGTERM); */
-    /* sigprocmask(SIG_BLOCK, &sigwait_set, NULL); */
+    sigset_t sigwait_set;
+    sigemptyset(&sigwait_set);
+    sigaddset(&sigwait_set, SIGCHLD);
+    sigaddset(&sigwait_set, SIGINT);
+    sigaddset(&sigwait_set, SIGTERM);
+    sigprocmask(SIG_BLOCK, &sigwait_set, NULL);
     
     // Terminal access control signals - we block these so that dinit can't be
     // suspended if it writes to the terminal after some other process has claimed
@@ -234,35 +300,45 @@ int main(int argc, char **argv)
     }
 
     // Set up signal handlers
-    ev_signal sigint_ev_signal;
+    //ev_signal sigint_ev_signal;
+    CallbackSignalHandler sigint_watcher;
     if (am_system_init) {
-      ev_signal_init(&sigint_ev_signal, sigint_reboot_cb, SIGINT);
+      //ev_signal_init(&sigint_ev_signal, sigint_reboot_cb, SIGINT);
+      sigint_watcher.setCbFunc(sigint_reboot_cb);
     }
     else {
-      ev_signal_init(&sigint_ev_signal, sigterm_cb, SIGINT);
+      //ev_signal_init(&sigint_ev_signal, sigterm_cb, SIGINT);
+      sigint_watcher.setCbFunc(sigterm_cb);
     }
     
-    ev_signal sigquit_ev_signal;
+    //ev_signal sigquit_ev_signal;
+    CallbackSignalHandler sigquit_watcher;
     if (am_system_init) {
         // PID 1: SIGQUIT exec's shutdown
-        ev_signal_init(&sigquit_ev_signal, sigquit_cb, SIGQUIT);
+        //ev_signal_init(&sigquit_ev_signal, sigquit_cb, SIGQUIT);
+        sigquit_watcher.setCbFunc(sigquit_cb);
     }
     else {
         // Otherwise: SIGQUIT terminates dinit
-        ev_signal_init(&sigquit_ev_signal, sigterm_cb, SIGQUIT);
+        //ev_signal_init(&sigquit_ev_signal, sigterm_cb, SIGQUIT);
+        sigquit_watcher.setCbFunc(sigterm_cb);
     }
     
-    ev_signal sigterm_ev_signal;
-    ev_signal_init(&sigterm_ev_signal, sigterm_cb, SIGTERM);
+    //ev_signal sigterm_ev_signal;
+    //ev_signal_init(&sigterm_ev_signal, sigterm_cb, SIGTERM);
+    auto sigterm_watcher = CallbackSignalHandler(sigterm_cb);
     
     /* Set up libev */
-    struct ev_loop *loop = ev_default_loop(EVFLAG_AUTO /* | EVFLAG_SIGNALFD */);
-    ev_signal_start(loop, &sigint_ev_signal);
-    ev_signal_start(loop, &sigquit_ev_signal);
-    ev_signal_start(loop, &sigterm_ev_signal);
+    //struct ev_loop *loop = ev_default_loop(EVFLAG_AUTO /* | EVFLAG_SIGNALFD */);
+    //ev_signal_start(loop, &sigint_ev_signal);
+    //ev_signal_start(loop, &sigquit_ev_signal);
+    //ev_signal_start(loop, &sigterm_ev_signal);
+    sigint_watcher.registerWatch(&eventLoop, SIGINT);
+    sigquit_watcher.registerWatch(&eventLoop, SIGQUIT);
+    sigterm_watcher.registerWatch(&eventLoop, SIGTERM);
 
     // Try to open control socket (may fail due to readonly filesystem)
-    open_control_socket(loop);
+    open_control_socket(&eventLoop);
     
 #ifdef __linux__
     if (am_system_init) {
@@ -299,7 +375,8 @@ int main(int argc, char **argv)
     
     // Process events until all services have terminated.
     while (service_set->count_active_services() != 0) {
-        ev_loop(loop, EVLOOP_ONESHOT);
+        // ev_loop(loop, EVLOOP_ONESHOT);
+        eventLoop.run();
     }
 
     ShutdownType shutdown_type = service_set->getShutdownType();
@@ -321,7 +398,7 @@ int main(int argc, char **argv)
         }
     }
     
-    close_control_socket(ev_default_loop(EVFLAG_AUTO));
+    close_control_socket(&eventLoop);
     
     if (am_system_init) {
         if (shutdown_type == ShutdownType::CONTINUE) {
@@ -357,7 +434,8 @@ int main(int argc, char **argv)
         
         // PID 1 must not actually exit, although we should never reach this point:
         while (true) {
-            ev_loop(loop, EVLOOP_ONESHOT);
+            // ev_loop(loop, EVLOOP_ONESHOT);
+            eventLoop.run();
         }
     }
     
@@ -365,15 +443,13 @@ int main(int argc, char **argv)
 }
 
 // Callback for control socket
-static void control_socket_cb(struct ev_loop *loop, ev_io *w, int revents)
+static void control_socket_cb(EventLoop_t *loop, int sockfd)
 {
     // TODO limit the number of active connections. Keep a tally, and disable the
     // control connection listening socket watcher if it gets high, and re-enable
     // it once it falls below the maximum.
 
     // Accept a connection
-    int sockfd = w->fd;
-
     int newfd = accept4(sockfd, nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC);
 
     if (newfd != -1) {
@@ -387,7 +463,7 @@ static void control_socket_cb(struct ev_loop *loop, ev_io *w, int revents)
     }
 }
 
-void open_control_socket(struct ev_loop *loop) noexcept
+void open_control_socket(EventLoop_t *loop) noexcept
 {
     if (! control_socket_open) {
         const char * saddrname = control_socket_path;
@@ -441,16 +517,18 @@ void open_control_socket(struct ev_loop *loop) noexcept
         }
 
         control_socket_open = true;
-        ev_io_init(&control_socket_io, control_socket_cb, sockfd, EV_READ);
-        ev_io_start(loop, &control_socket_io);
+        //ev_io_init(&control_socket_io, control_socket_cb, sockfd, EV_READ);
+        //ev_io_start(loop, &control_socket_io);
+        control_socket_io.registerWith(&eventLoop, sockfd, in_events);
     }
 }
 
-void close_control_socket(struct ev_loop *loop) noexcept
+void close_control_socket(EventLoop_t *loop) noexcept
 {
     if (control_socket_open) {
         int fd = control_socket_io.fd;
-        ev_io_stop(loop, &control_socket_io);
+        //ev_io_stop(loop, &control_socket_io);
+        control_socket_io.deregisterWatch(&eventLoop);
         close(fd);
         
         // Unlink the socket:
@@ -459,24 +537,24 @@ void close_control_socket(struct ev_loop *loop) noexcept
 }
 
 /* handle SIGINT signal (generated by kernel when ctrl+alt+del pressed) */
-static void sigint_reboot_cb(struct ev_loop *loop, ev_signal *w, int revents)
+static void sigint_reboot_cb(EventLoop_t *eloop) noexcept
 {
     service_set->stop_all_services(ShutdownType::REBOOT);
 }
 
 /* handle SIGQUIT (if we are system init) */
-static void sigquit_cb(struct ev_loop *loop, ev_signal *w, int revents)
+static void sigquit_cb(EventLoop_t *eloop) noexcept
 {
     // This allows remounting the filesystem read-only if the dinit binary has been
     // unlinked. In that case the kernel holds the binary open, so that it can't be
     // properly removed.
-    close_control_socket(ev_default_loop(EVFLAG_AUTO));
+    close_control_socket(eloop);
     execl("/sbin/shutdown", "/sbin/shutdown", (char *) 0);
     log(LogLevel::ERROR, "Error executing /sbin/shutdown: ", strerror(errno));
 }
 
 /* handle SIGTERM/SIGQUIT - stop all services (not used for system daemon) */
-static void sigterm_cb(struct ev_loop *loop, ev_signal *w, int revents)
+static void sigterm_cb(EventLoop_t *eloop) noexcept
 {
     service_set->stop_all_services();
 }
index 41fdbcaa8808e314a7d1ca4e6b21ed2a14b5ea89..98de0e5834be9f34296733381335c9aa325f6c79 100644 (file)
@@ -18,8 +18,8 @@
 #include "dinit-log.h"
 
 // from dinit.cc:
-void open_control_socket(struct ev_loop *loop) noexcept;
-
+void open_control_socket(EventLoop_t *loop) noexcept;
+extern EventLoop_t eventLoop;
 
 // Find the requested service by name
 static ServiceRecord * findService(const std::list<ServiceRecord *> & records,
@@ -91,13 +91,12 @@ void ServiceRecord::stopped() noexcept
     }
 }
 
-void ServiceRecord::process_child_callback(struct ev_loop *loop, ev_child *w, int revents) noexcept
+void ServiceChildWatcher::gotTermStat(EventLoop_t * loop, pid_t child, int status) noexcept
 {
-    ServiceRecord *sr = (ServiceRecord *) w->data;
-
+    ServiceRecord *sr = service;
+    
     sr->pid = -1;
-    sr->exit_status = w->rstatus;
-    ev_child_stop(loop, w);
+    sr->exit_status = status;
     
     // Ok, for a process service, any process death which we didn't rig
     // ourselves is a bit... unexpected. Probably, the child died because
@@ -110,7 +109,7 @@ void ServiceRecord::process_child_callback(struct ev_loop *loop, ev_child *w, in
         return;
     }
     
-    sr->handle_exit_status();
+    sr->handle_exit_status();    
 }
 
 bool ServiceRecord::do_auto_restart() noexcept
@@ -206,15 +205,22 @@ void ServiceRecord::handle_exit_status() noexcept
     }
 }
 
-void ServiceRecord::process_child_status(struct ev_loop *loop, ev_io * stat_io, int revents) noexcept
+Rearm ServiceIoWatcher::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept
 {
-    ServiceRecord *sr = (ServiceRecord *) stat_io->data;
+    ServiceRecord::process_child_status(loop, this, flags);
+    return Rearm::NOOP;
+}
+
+// TODO remove unused revents param
+void ServiceRecord::process_child_status(EventLoop_t *loop, ServiceIoWatcher * stat_io, int revents) noexcept
+{
+    ServiceRecord *sr = stat_io->service;
     sr->waiting_for_execstat = false;
     
     int exec_status;
     int r = read(stat_io->fd, &exec_status, sizeof(int));
     close(stat_io->fd);
-    ev_io_stop(loop, stat_io);
+    stat_io->deregisterWatch(loop);
     
     if (r != 0) {
         // We read an errno code; exec() failed, and the service startup failed.
@@ -224,7 +230,7 @@ void ServiceRecord::process_child_status(struct ev_loop *loop, ev_io * stat_io,
             sr->failed_to_start();
         }
         else if (sr->service_state == ServiceState::STOPPING) {
-            // Must be a scripted servce. We've logged the failure, but it's probably better
+            // Must be a scripted service. We've logged the failure, but it's probably better
             // not to leave the service in STARTED state:
             sr->stopped();
         }
@@ -506,10 +512,11 @@ bool ServiceRecord::read_pid_file() noexcept
         if (r > 0) {
             pidbuf[r] = 0; // store nul terminator
             pid = std::atoi(pidbuf);
-            if (kill(pid, 0) == 0) {
-                ev_child_init(&child_listener, process_child_callback, pid, 0);
-                child_listener.data = this;
-                ev_child_start(ev_default_loop(EVFLAG_AUTO), &child_listener);
+            if (kill(pid, 0) == 0) {                
+                //ev_child_init(&child_listener, process_child_callback, pid, 0);
+                //child_listener.data = this;
+                //ev_child_start(ev_default_loop(EVFLAG_AUTO), &child_listener);
+                child_listener.registerWith(&eventLoop, pid);
             }
             else {
                 log(LogLevel::ERROR, service_name, ": pid read from pidfile (", pid, ") is not valid");
@@ -546,7 +553,7 @@ void ServiceRecord::started() noexcept
     notifyListeners(ServiceEvent::STARTED);
 
     if (onstart_flags.rw_ready) {
-        open_control_socket(ev_default_loop(EVFLAG_AUTO));
+        open_control_socket(&eventLoop);
     }
 
     if (force_stop || desired_state == ServiceState::STOPPED) {
@@ -634,7 +641,16 @@ bool ServiceRecord::start_ps_process(const std::vector<const char *> &cmd, bool
     if (forkpid == 0) {
         // Child process. Must not allocate memory (or otherwise risk throwing any exception)
         // from here until exit().
-        ev_default_destroy(); // won't need that on this side, free up fds.
+        // TODO: we may need an equivalent for the following:
+        //   ev_default_destroy(); // won't need that on this side, free up fds.
+
+        // Unmask signals that we masked on startup:
+        sigset_t sigwait_set;
+        sigemptyset(&sigwait_set);
+        sigaddset(&sigwait_set, SIGCHLD);
+        sigaddset(&sigwait_set, SIGINT);
+        sigaddset(&sigwait_set, SIGTERM);
+        sigprocmask(SIG_UNBLOCK, &sigwait_set, NULL);
 
         constexpr int bufsz = ((CHAR_BIT * sizeof(pid_t) - 1) / 3 + 2) + 11;
         // "LISTEN_PID=" - 11 characters
@@ -693,15 +709,13 @@ bool ServiceRecord::start_ps_process(const std::vector<const char *> &cmd, bool
         pid = forkpid;
 
         // Listen for status
-        ev_io_init(&child_status_listener, process_child_status, pipefd[0], EV_READ);
-        child_status_listener.data = this;
-        ev_io_start(ev_default_loop(EVFLAG_AUTO), &child_status_listener);
+        // TODO should set this up earlier so we can handle failure case (exception)
+        child_status_listener.registerWith(&eventLoop, pipefd[0], in_events);
 
         // Add a process listener so we can detect when the
         // service stops
-        ev_child_init(&child_listener, process_child_callback, pid, 0);
-        child_listener.data = this;
-        ev_child_start(ev_default_loop(EVFLAG_AUTO), &child_listener);
+        // TODO should reserve listener, handle exceptions
+        child_listener.registerWith(&eventLoop, pid);
         waiting_for_execstat = true;
         return true;
     }
index 35a28a3f21657da50d4880c992000c6656a1cd1f..66a3014a1080babe7946da6d38bb26fc01a6f340 100644 (file)
@@ -6,7 +6,9 @@
 #include <vector>
 #include <csignal>
 #include <unordered_set>
-#include "ev.h"
+
+#include "dasync.h"
+
 #include "control.h"
 #include "service-listener.h"
 #include "service-constants.h"
@@ -146,9 +148,38 @@ static std::vector<const char *> separate_args(std::string &s, std::list<std::pa
     return r;
 }
 
+class ServiceChildWatcher : public PosixChildWatcher<NullMutex>
+{
+    public:
+    // TODO resolve clunkiness of storing this field
+    ServiceRecord * service;
+    void gotTermStat(EventLoop_t * eloop, pid_t child, int status) noexcept;
+    
+    ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { }
+};
+
+class ServiceIoWatcher : public PosixFdWatcher<NullMutex>
+{
+    public:
+    // TODO resolve clunkiness of storing these fields
+    int fd;
+    ServiceRecord * service;
+    Rearm gotEvent(EventLoop_t * eloop, int fd, int flags) noexcept;
+    
+    ServiceIoWatcher(ServiceRecord * sr) noexcept : service(sr) { }
+    
+    void registerWith(EventLoop_t *loop, int fd, int flags)
+    {
+        this->fd = fd;
+        PosixFdWatcher<NullMutex>::registerWith(loop, fd, flags);
+    }
+};
 
 class ServiceRecord
 {
+    friend class ServiceChildWatcher;
+    friend class ServiceIoWatcher;
+    
     typedef std::string string;
     
     string service_name;
@@ -222,8 +253,8 @@ class ServiceRecord
     int socket_fd = -1;  // For socket-activation services, this is the file
                          // descriptor for the socket.
 
-    ev_child child_listener;
-    ev_io child_status_listener;
+    ServiceChildWatcher child_listener;
+    ServiceIoWatcher child_status_listener;
     
     // All dependents have stopped.
     void allDepsStopped();
@@ -244,10 +275,10 @@ class ServiceRecord
     bool start_ps_process(const std::vector<const char *> &args, bool on_console) noexcept;
     
     // Callback from libev when a child process dies
-    static void process_child_callback(struct ev_loop *loop, struct ev_child *w,
+    static void process_child_callback(EventLoop_t *loop, ServiceChildWatcher *w,
             int revents) noexcept;
     
-    static void process_child_status(struct ev_loop *loop, ev_io * stat_io,
+    static void process_child_status(EventLoop_t *loop, ServiceIoWatcher * stat_io,
             int revents) noexcept;
     
     void handle_exit_status() noexcept;
@@ -327,7 +358,7 @@ class ServiceRecord
         : service_state(ServiceState::STOPPED), desired_state(ServiceState::STOPPED), auto_restart(false),
             pinned_stopped(false), pinned_started(false), waiting_for_deps(false),
             waiting_for_execstat(false), doing_recovery(false),
-            start_explicit(false), force_stop(false)
+            start_explicit(false), force_stop(false), child_listener(this), child_status_listener(this)
     {
         service_set = set;
         service_name = name;