Update Dasynq, again, and update Dinit code to reflect API changes.
authorDavin McCall <davmac@davmac.org>
Sat, 18 Jun 2016 14:11:19 +0000 (15:11 +0100)
committerDavin McCall <davmac@davmac.org>
Sat, 18 Jun 2016 14:11:19 +0000 (15:11 +0100)
Hopefully the Dasynq API is now fairly stable.

src/control.cc
src/control.h
src/dasynq/dasynq-childproc.h
src/dasynq/dasynq-epoll.h
src/dasynq/dasynq-kqueue.h
src/dasynq/dasynq.h
src/dinit-log.cc
src/dinit.cc
src/service.cc
src/service.h

index 20478cfc212a6c1fc32146e661ae6384b177350f..5cc8798caea449421fa299bd0463a4514a49da9f 100644 (file)
@@ -51,7 +51,7 @@ bool ControlConn::processPacket()
         char outbuf[] = { DINIT_RP_BADREQ };
         if (! queuePacket(outbuf, 1)) return false;
         bad_conn_close = true;
-        iob.setWatchFlags(OUT_EVENTS);
+        iob.setWatches(OUT_EVENTS);
     }
     return true;
 }
@@ -75,7 +75,7 @@ bool ControlConn::processFindLoad(int pktType)
         char badreqRep[] = { DINIT_RP_BADREQ };
         if (! queuePacket(badreqRep, 1)) return false;
         bad_conn_close = true;
-        iob.setWatchFlags(OUT_EVENTS);
+        iob.setWatches(OUT_EVENTS);
         return true;
     }
     
@@ -151,7 +151,7 @@ bool ControlConn::processStartStop(int pktType)
         char badreqRep[] = { DINIT_RP_BADREQ };
         if (! queuePacket(badreqRep, 1)) return false;
         bad_conn_close = true;
-        iob.setWatchFlags(OUT_EVENTS);
+        iob.setWatches(OUT_EVENTS);
         return true;
     }
     else {
@@ -225,7 +225,7 @@ bool ControlConn::processUnpinService()
         char badreqRep[] = { DINIT_RP_BADREQ };
         if (! queuePacket(badreqRep, 1)) return false;
         bad_conn_close = true;
-        iob.setWatchFlags(OUT_EVENTS);
+        iob.setWatches(OUT_EVENTS);
         return true;
     }
     else {
@@ -288,7 +288,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
         else {
             if ((unsigned)wr == size) {
                 // Ok, all written.
-                iob.setWatchFlags(in_flag);
+                iob.setWatches(in_flag);
                 return true;
             }
             pkt += wr;
@@ -299,7 +299,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
     // Create a vector out of the (remaining part of the) packet:
     try {
         outbuf.emplace_back(pkt, pkt + size);
-        iob.setWatchFlags(in_flag | OUT_EVENTS);
+        iob.setWatches(in_flag | OUT_EVENTS);
         return true;
     }
     catch (std::bad_alloc &baexc) {
@@ -313,7 +313,7 @@ bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
             return false;
         }
         else {
-            iob.setWatchFlags(OUT_EVENTS);
+            iob.setWatches(OUT_EVENTS);
             return true;
         }
     }
@@ -343,7 +343,7 @@ bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
         else {
             if ((unsigned)wr == pkt.size()) {
                 // Ok, all written.
-                iob.setWatchFlags(in_flag);
+                iob.setWatches(in_flag);
                 return true;
             }
             outpkt_index = wr;
@@ -352,7 +352,7 @@ bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
     
     try {
         outbuf.emplace_back(pkt);
-        iob.setWatchFlags(in_flag | OUT_EVENTS);
+        iob.setWatches(in_flag | OUT_EVENTS);
         return true;
     }
     catch (std::bad_alloc &baexc) {
@@ -366,7 +366,7 @@ bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
             return false;
         }
         else {
-            iob.setWatchFlags(OUT_EVENTS);
+            iob.setWatches(OUT_EVENTS);
             return true;
         }
     }
@@ -412,11 +412,11 @@ bool ControlConn::dataReady() noexcept
         // TODO log error?
         // TODO error response?
         bad_conn_close = true;
-        iob.setWatchFlags(OUT_EVENTS);
+        iob.setWatches(OUT_EVENTS);
     }
     else {
         int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
-        iob.setWatchFlags(IN_EVENTS | out_flags);
+        iob.setWatches(IN_EVENTS | out_flags);
     }
     
     return false;
@@ -458,7 +458,7 @@ bool ControlConn::sendData() noexcept
         outpkt_index = 0;
         if (outbuf.empty() && ! oom_close) {
             if (! bad_conn_close) {
-                iob.setWatchFlags(IN_EVENTS);
+                iob.setWatches(IN_EVENTS);
             }
             else {
                 return true;
@@ -472,7 +472,7 @@ bool ControlConn::sendData() noexcept
 ControlConn::~ControlConn() noexcept
 {
     close(iob.fd);
-    iob.deregisterWatch(loop);
+    iob.deregister(*loop);
     
     // Clear service listeners
     for (auto p : serviceKeyMap) {
index a184039ffb3babea65c6f927487833c91ff41eee..05b48459ffb7444203057c1f90b2bb0fae38716e 100644 (file)
@@ -49,14 +49,14 @@ class ServiceRecord;
 
 class ControlConnWatcher : public EventLoop_t::BidiFdWatcher
 {
-    inline Rearm receiveEvent(EventLoop_t loop, int fd, int flags) noexcept;
+    inline Rearm receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept;
 
-    Rearm readReady(EventLoop_t loop, int fd) noexcept override
+    Rearm readReady(EventLoop_t &loop, int fd) noexcept override
     {
         return receiveEvent(loop, fd, IN_EVENTS);
     }
     
-    Rearm writeReady(EventLoop_t loop, int fd) noexcept override
+    Rearm writeReady(EventLoop_t &loop, int fd) noexcept override
     {
         return receiveEvent(loop, fd, OUT_EVENTS);
     }
@@ -65,22 +65,22 @@ class ControlConnWatcher : public EventLoop_t::BidiFdWatcher
     int fd; // TODO this is already stored, find a better way to access it.
     EventLoop_t * eventLoop;
     
-    void setWatchFlags(int flags)
+    void setWatches(int flags)
     {
-        EventLoop_t::BidiFdWatcher::setWatchFlags(eventLoop, flags);
+        EventLoop_t::BidiFdWatcher::setWatches(*eventLoop, flags);
     }
     
-    void registerWith(EventLoop_t *loop, int fd, int flags)
+    void registerWith(EventLoop_t &loop, int fd, int flags)
     {
         this->fd = fd;
-        this->eventLoop = loop;
-        BidiFdWatcher<EventLoop_t>::registerWith(loop, fd, flags);
+        this->eventLoop = &loop;
+        BidiFdWatcher<EventLoop_t>::addWatch(loop, fd, flags);
     }
 };
 
-inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t loop, int fd, int flags) noexcept
+inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept
 {
-    return control_conn_cb(loop, this, flags);
+    return control_conn_cb(&loop, this, flags);
 }
 
 
@@ -169,7 +169,7 @@ class ControlConn : private ServiceListener
     {
         bad_conn_close = true;
         oom_close = true;
-        iob.setWatchFlags(OUT_EVENTS);
+        iob.setWatches(OUT_EVENTS);
     }
     
     // Process service event broadcast.
@@ -206,7 +206,7 @@ class ControlConn : private ServiceListener
     public:
     ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
     {
-        iob.registerWith(loop, fd, IN_EVENTS);
+        iob.registerWith(*loop, fd, IN_EVENTS);
         active_control_conns++;
     }
     
index e364613e0f3ebde071ca77898db4ce834ee7bb61..af33e2619fc1575d150ae55fc0a05b6feb1eb9bb 100644 (file)
@@ -56,6 +56,11 @@ class pid_map
         backup_vector.resize(backup_vector.size() + 1);
     }
     
+    void unreserve() noexcept
+    {
+        backup_vector.resize(backup_vector.size() - 1);
+    }
+    
     void add(pid_t key, void *val) // throws std::bad_alloc
     {
         base_map[key] = val;
@@ -113,18 +118,38 @@ template <class Base> class ChildProcEvents : public Base
     public:
     void reserveChildWatch()
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         child_waiters.reserve();
     }
     
+    void unreserveChildWatch() noexcept
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        child_waiters.unreserve();
+    }
+    
     void addChildWatch(pid_t child, void *val)
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         child_waiters.add(child, val);
     }
     
     void addReservedChildWatch(pid_t child, void *val) noexcept
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         child_waiters.add_from_reserve(child, val);
     }
+
+    void addReservedChildWatch_nolock(pid_t child, void *val) noexcept
+    {
+        child_waiters.add_from_reserve(child, val);
+    }
+    
+    void removeChildWatch(pid_t child) noexcept
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        child_waiters.erase(child);
+    }
     
     template <typename T> void init(T *loop_mech)
     {
index 1ffbc56edf2014c3cb56af5a54ea6804669563f8..7a93e21fcb99f2c2074f28dd91bc4b4bc01ed883 100644 (file)
@@ -63,6 +63,7 @@ class EpollTraits
     
     const static bool has_bidi_fd_watch = true;
     const static bool has_separate_rw_fd_watches = false;
+    const static bool supports_childwatch_reservation = true;
 };
 
 
@@ -167,6 +168,12 @@ template <class Base> class EpollLoop : public Base
         }
     }
     
+    void addBidiFdWatch(int fd, void *userdata, int flags)
+    {
+        // No implementation.
+        throw std::system_error(std::make_error_code(std::errc::not_supported));
+    }
+    
     void removeFdWatch(int fd, int flags) noexcept
     {
         epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr);
@@ -177,6 +184,12 @@ template <class Base> class EpollLoop : public Base
         removeFdWatch(fd, flags);
     }
     
+    void removeBidiFdWatch(int fd) noexcept
+    {
+        // Shouldn't be called for epoll.
+        removeFdWatch(fd, IN_EVENTS | OUT_EVENTS);
+    }
+    
     // Note this will *replace* the old flags with the new, that is,
     // it can enable *or disable* read/write events.
     void enableFdWatch(int fd, void *userdata, int flags) noexcept
index 64a3828cd9a0c00b63012ec3209b23952b3ce57b..f6d4ce27a9d308670a3b95725a67a00a6c3e9346 100644 (file)
@@ -71,6 +71,7 @@ class KqueueTraits
     
     const static bool has_bidi_fd_watch = false;
     const static bool has_separate_rw_fd_watches = true;
+    const static bool supports_childwatch_reservation = true;
 };
 
 #if defined(__OpenBSD__)
@@ -199,6 +200,8 @@ template <class Base> class KqueueLoop : public Base
     void addFdWatch(int fd, void *userdata, int flags)
     {
         // TODO kqueue doesn't support EVFILE_WRITE on file fd's :/
+        // Presumably they cause the kevent call to fail. We could maintain
+        // a separate set and use poll() (urgh).
         
         short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE;
         
@@ -209,6 +212,17 @@ template <class Base> class KqueueLoop : public Base
         }
     }
     
+    void addBidiFdWatch(int fd, void *userdata, int flags)
+    {
+        struct kevent kev[2];
+        EV_SET(&kev[0], fd, EVFILT_READ, EV_ADD, 0, 0, userdata);
+        EV_SET(&kev[1], fd, EVFILE_WRITE, EV_ADD, 0, 0, userdata);
+        
+        if (kevent(kqfd, kev, 2, nullptr, 0, nullptr) == -1) {
+            throw new std::system_error(errno, std::system_category());
+        }        
+    }
+    
     void removeFdWatch(int fd, int flags)
     {        
         removeFilter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd);
@@ -218,6 +232,15 @@ template <class Base> class KqueueLoop : public Base
     {
         removeFdWatch(fd, flags);
     }
+
+    void removeBidiFdWatch(int fd) noexcept
+    {
+        struct kevent kev[2];
+        EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+        EV_SET(&kev[1], fd, EVFILE_WRITE, EV_DELETE, 0, 0, userdata);
+        
+        kevent(kqfd, kev, 2, nullptr, 0, nullptr);
+    }
     
     void enableFdWatch(int fd, void *userdata, int flags)
     {
index d9b4b4a37037b015f4964d3fc742d275e48acc7e..426fd8b6ce0b3cab9ff4eb868667fe1f2d2d48af 100644 (file)
@@ -30,6 +30,10 @@ namespace dasynq {
 #include <condition_variable>
 #include <cstdint>
 #include <cstddef>
+#include <system_error>
+
+#include <unistd.h>
+#include <fcntl.h>
 
 #include "dasynq-mutex.h"
 
@@ -159,7 +163,7 @@ namespace dprivate {
         public:
         typedef SigInfo &SigInfo_p;
         
-        virtual Rearm gotSignal(EventLoop<T_Mutex> * eloop, int signo, SigInfo_p siginfo) = 0;
+        virtual Rearm received(EventLoop<T_Mutex> &eloop, int signo, SigInfo_p siginfo) = 0;
     };
     
     template <typename T_Mutex>
@@ -176,7 +180,7 @@ namespace dprivate {
         BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }
         
         public:
-        virtual Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) = 0;
+        virtual Rearm fdEvent(EventLoop<T_Mutex> &eloop, int fd, int flags) = 0;
     };
     
     template <typename T_Mutex>
@@ -186,7 +190,7 @@ namespace dprivate {
         friend class dasynq::EventLoop<T_Mutex>;
         
         // This should never actually get called:
-        Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) final
+        Rearm fdEvent(EventLoop<T_Mutex> &eloop, int fd, int flags) final
         {
             return Rearm::REARM; // should not be reachable.
         };
@@ -201,8 +205,8 @@ namespace dprivate {
         int write_removed : 1; // write watch removed?
         
         public:
-        virtual Rearm readReady(EventLoop<T_Mutex> eloop, int fd) noexcept = 0;
-        virtual Rearm writeReady(EventLoop<T_Mutex> eloop, int fd) noexcept = 0;
+        virtual Rearm readReady(EventLoop<T_Mutex> &eloop, int fd) noexcept = 0;
+        virtual Rearm writeReady(EventLoop<T_Mutex> &eloop, int fd) noexcept = 0;
     };
     
     template <typename T_Mutex>
@@ -218,7 +222,7 @@ namespace dprivate {
         BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { }
         
         public:
-        virtual void gotTermStat(EventLoop<T_Mutex> * eloop, pid_t child, int status) = 0;
+        virtual Rearm childStatus(EventLoop<T_Mutex> &eloop, pid_t child, int status) = 0;
     };
 
     // Classes for implementing a fair(ish) wait queue.
@@ -431,8 +435,6 @@ namespace dprivate {
             
             lock.lock();
             
-            // TODO this needs to handle multi-watch (BidiFdWatcher) properly
-            
             if (watcher->active) {
                 // If the watcher is active, set deleteme true; the watcher will be removed
                 // at the end of current processing (i.e. when active is set false).
@@ -496,6 +498,10 @@ template <typename T_Mutex> class EventLoop
     friend class dprivate::SignalWatcher<EventLoop<T_Mutex>>;
     friend class dprivate::ChildProcWatcher<EventLoop<T_Mutex>>;
     
+    public:
+    using LoopTraits = dasynq::LoopTraits;
+    
+    private:
     template <typename T, typename U> using EventDispatch = dprivate::EventDispatch<T,U>;
     template <typename T> using waitqueue = dprivate::waitqueue<T>;
     template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
@@ -587,7 +593,7 @@ template <typename T_Mutex> class EventLoop
     void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask)
     {
         if (LoopTraits::has_separate_rw_fd_watches) {
-            // TODO
+            loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT);
         }
         else {
             loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT);
@@ -630,7 +636,7 @@ template <typename T_Mutex> class EventLoop
     void deregister(BaseBidiFdWatcher *callback, int fd)
     {
         if (LoopTraits::has_separate_rw_fd_watches) {
-            // TODO
+            loop_mech.removeBidiFdWatch(fd);
         }
         else {
             loop_mech.removeFdWatch(fd, callback->watch_flags);
@@ -650,6 +656,11 @@ template <typename T_Mutex> class EventLoop
         loop_mech.reserveChildWatch();
     }
     
+    void unreserve(BaseChildWatcher *callBack)
+    {
+        loop_mech.unreserveChildWatch();
+    }
+    
     void registerChild(BaseChildWatcher *callBack, pid_t child)
     {
         loop_mech.addChildWatch(child, callBack);
@@ -659,6 +670,24 @@ template <typename T_Mutex> class EventLoop
     {
         loop_mech.addReservedChildWatch(child, callBack);
     }
+
+    void registerReservedChild_nolock(BaseChildWatcher *callBack, pid_t child) noexcept
+    {
+        loop_mech.addReservedChildWatch_nolock(child, callBack);
+    }
+    
+    void deregister(BaseChildWatcher *callback, pid_t child)
+    {
+        loop_mech.removeChildWatch(child);
+
+        waitqueue_node<T_Mutex> qnode;
+        getAttnLock(qnode);
+        
+        EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+        ed.issueDelete(callback);
+        
+        releaseLock(qnode);
+    }
     
     void dequeueWatcher(BaseWatcher *watcher) noexcept
     {
@@ -860,6 +889,7 @@ template <typename T_Mutex> class EventLoop
             }
             else if (pqueue->watchType == WatchType::SECONDARYFD) {
                 is_multi_watch = true;
+                // construct a pointer to the main watcher:
                 char * rp = (char *)pqueue;
                 rp -= offsetof(BaseBidiFdWatcher, outWatcher);
                 bbfw = (BaseBidiFdWatcher *)rp;
@@ -879,7 +909,7 @@ template <typename T_Mutex> class EventLoop
             switch (pqueue->watchType) {
             case WatchType::SIGNAL: {
                 BaseSignalWatcher *bsw = static_cast<BaseSignalWatcher *>(pqueue);
-                rearmType = bsw->gotSignal(this, bsw->siginfo.get_signo(), bsw->siginfo);
+                rearmType = bsw->received(*this, bsw->siginfo.get_signo(), bsw->siginfo);
                 break;
             }
             case WatchType::FD: {
@@ -887,24 +917,20 @@ template <typename T_Mutex> class EventLoop
                 if (is_multi_watch) {
                     // The primary watcher for a multi-watch watcher is queued for
                     // read events.
-                    rearmType = bbfw->readReady(this, bfw->watch_fd);
+                    rearmType = bbfw->readReady(*this, bfw->watch_fd);
                 }
                 else {
-                    rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags);
+                    rearmType = bfw->fdEvent(*this, bfw->watch_fd, bfw->event_flags);
                 }
                 break;
             }
             case WatchType::CHILD: {
                 BaseChildWatcher *bcw = static_cast<BaseChildWatcher *>(pqueue);
-                bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status);
-                // Child watches automatically remove:
-                // TODO what if they want to return REMOVED...
-                rearmType = Rearm::REMOVE;
+                rearmType = bcw->childStatus(*this, bcw->watch_pid, bcw->child_status);
                 break;
             }
             case WatchType::SECONDARYFD: {
-                // first construct a pointer to the main watcher:
-                rearmType = bbfw->writeReady(this, bbfw->watch_fd);
+                rearmType = bbfw->writeReady(*this, bbfw->watch_fd);
                 break;
             }
             default: ;
@@ -956,6 +982,8 @@ template <typename T_Mutex> class EventLoop
     using SignalWatcher = dprivate::SignalWatcher<EventLoop<T_Mutex>>;
     using ChildProcWatcher = dprivate::ChildProcWatcher<EventLoop<T_Mutex>>;
     
+    // using LoopTraits = dasynq::LoopTraits;
+    
     void run() noexcept
     {
         while (! processEvents()) {
@@ -994,20 +1022,21 @@ public:
 
     // Register this watcher to watch the specified signal.
     // If an attempt is made to register with more than one event loop at
-    // a time, behaviour is undefined.
-    inline void registerWatch(EventLoop *eloop, int signo)
+    // a time, behaviour is undefined. The signal should be masked before
+    // call.
+    inline void addWatch(EventLoop &eloop, int signo)
     {
         BaseWatcher::init();
         this->siginfo.set_signo(signo);
-        eloop->registerSignal(this, signo);
+        eloop.registerSignal(this, signo);
     }
     
-    inline void deregisterWatch(EventLoop *eloop) noexcept
+    inline void deregister(EventLoop &eloop) noexcept
     {
-        eloop->deregister(this, this->siginfo.get_signo());
+        eloop.deregister(this, this->siginfo.get_signo());
     }
     
-    // virtual Rearm gotSignal(EventLoop<T_Mutex> *, int signo, SigInfo_p info) = 0;
+    // virtual Rearm received(EventLoop &, int signo, SigInfo_p info) = 0;
 };
 
 // Posix file descriptor event watcher
@@ -1021,7 +1050,7 @@ class FdWatcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
     
     // Set the types of event to watch. Only supported if LoopTraits::has_bidi_fd_watch
     // is true; otherwise has unspecified behavior.
-    // Only safe to call from within the callback handler (gotEvent). Might not take
+    // Only safe to call from within the callback handler (fdEvent). Might not take
     // effect until the current callback handler returns with REARM.
     void setWatchFlags(int newFlags)
     {
@@ -1043,12 +1072,12 @@ class FdWatcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
     // causes undefined behavior.
     //
     // Can fail with std::bad_alloc or std::system_error.
-    void registerWith(EventLoop *eloop, int fd, int flags)
+    void addWatch(EventLoop &eloop, int fd, int flags)
     {
         BaseWatcher::init();
         this->watch_fd = fd;
         this->watch_flags = flags;
-        eloop->registerFd(this, fd, flags);
+        eloop.registerFd(this, fd, flags);
     }
     
     // Deregister a file descriptor watcher.
@@ -1060,21 +1089,21 @@ class FdWatcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
     // calling this method as long as the handler (if it is active) accesses no
     // internal state and returns Rearm::REMOVED.
     //   TODO: implement REMOVED, or correct above statement.
-    void deregisterWatch(EventLoop *eloop) noexcept
+    void deregister(EventLoop &eloop) noexcept
     {
-        eloop->deregister(this, this->watch_fd);
+        eloop.deregister(this, this->watch_fd);
     }
     
-    void setEnabled(EventLoop *eloop, bool enable) noexcept
+    void setEnabled(EventLoop &eloop, bool enable) noexcept
     {
-        std::lock_guard<T_Mutex> guard(eloop->getBaseLock());
-        eloop->setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable);
+        std::lock_guard<T_Mutex> guard(eloop.getBaseLock());
+        eloop.setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable);
         if (! enable) {
-            eloop->dequeueWatcher(this);
+            eloop.dequeueWatcher(this);
         }
     }
     
-    // virtual Rearm gotEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
+    // virtual Rearm fdEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
 };
 
 // A Bi-directional file descriptor watcher with independent read- and write- channels.
@@ -1086,7 +1115,7 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
     using BaseWatcher = dprivate::BaseWatcher;
     using T_Mutex = typename EventLoop::mutex_t;
     
-    void setWatchEnabled(EventLoop *eloop, bool in, bool b)
+    void setWatchEnabled(EventLoop &eloop, bool in, bool b)
     {
         int events = in ? IN_EVENTS : OUT_EVENTS;
         
@@ -1098,18 +1127,18 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
         }
         if (LoopTraits::has_separate_rw_fd_watches) {
             dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
-            eloop->setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b);
+            eloop.setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b);
             if (! b) {
-                eloop->dequeueWatcher(watcher);
+                eloop.dequeueWatcher(watcher);
             }
         }
         else {
-            eloop->setFdEnabled_nolock(this, this->watch_fd,
+            eloop.setFdEnabled_nolock(this, this->watch_fd,
                     (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT,
                     (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) != 0);
             if (! b) {
                 dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
-                eloop->dequeueWatcher(watcher);
+                eloop.dequeueWatcher(watcher);
             }
         }
     }
@@ -1118,18 +1147,18 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
     
     // TODO if a watch is disabled and currently queued, we should de-queue it.
     
-    void setInWatchEnabled(EventLoop *eloop, bool b) noexcept
+    void setInWatchEnabled(EventLoop &eloop, bool b) noexcept
     {
-        eloop->getBaseLock().lock();
+        eloop.getBaseLock().lock();
         setWatchEnabled(eloop, true, b);
-        eloop->getBaseLock().unlock();
+        eloop.getBaseLock().unlock();
     }
     
-    void setOutWatchEnabled(EventLoop *eloop, bool b) noexcept
+    void setOutWatchEnabled(EventLoop &eloop, bool b) noexcept
     {
-        eloop->getBaseLock().lock();
+        eloop.getBaseLock().lock();
         setWatchEnabled(eloop, false, b);
-        eloop->getBaseLock().unlock();
+        eloop.getBaseLock().unlock();
     }
     
     // Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly.
@@ -1139,16 +1168,16 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
     ///   - unless the event loop will not be polled while the watcher is active.
     // (i.e. it is ok to call setWatchFlags from within the readReady/writeReady handlers if no other
     //  thread will poll the event loop; it is ok to *dis*able a watcher that might be active).
-    void setWatchFlags(EventLoop * eloop, int newFlags)
+    void setWatches(EventLoop &eloop, int newFlags)
     {
-        std::lock_guard<T_Mutex> guard(eloop->getBaseLock());
+        std::lock_guard<T_Mutex> guard(eloop.getBaseLock());
         if (LoopTraits::has_separate_rw_fd_watches) {
             setWatchEnabled(eloop, true, (newFlags & IN_EVENTS) != 0);
             setWatchEnabled(eloop, false, (newFlags & OUT_EVENTS) != 0);
         }
         else {
             this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags;
-            eloop->setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
+            eloop.setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
         }
     }
     
@@ -1158,13 +1187,13 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
     // can be any combination of dasynq::IN_EVENTS / dasynq::OUT_EVENTS.
     //
     // Can fail with std::bad_alloc or std::system_error.
-    void registerWith(EventLoop *eloop, int fd, int flags)
+    void addWatch(EventLoop &eloop, int fd, int flags)
     {
         BaseWatcher::init();
         this->outWatcher.BaseWatcher::init();
         this->watch_fd = fd;
         this->watch_flags = flags | dprivate::multi_watch;
-        eloop->registerFd(this, fd, flags);
+        eloop.registerFd(this, fd, flags);
     }
     
     // Deregister a bi-direction file descriptor watcher.
@@ -1176,9 +1205,9 @@ class BidiFdWatcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mu
     // calling this method as long as the handler (if it is active) accesses no
     // internal state and returns Rearm::REMOVED.
     //   TODO: implement REMOVED, or correct above statement.
-    void deregisterWatch(EventLoop *eloop) noexcept
+    void deregister(EventLoop &eloop) noexcept
     {
-        eloop->deregister(this, this->watch_fd);
+        eloop.deregister(this, this->watch_fd);
     }
     
     // Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
@@ -1194,31 +1223,129 @@ class ChildProcWatcher : private dprivate::BaseChildWatcher<typename EventLoop::
 
     public:
     // Reserve resources for a child watcher with the given event loop.
-    // Reservation can fail with std::bad_alloc.
-    void reserveWith(EventLoop *eloop)
+    // Reservation can fail with std::bad_alloc. Some backends do not support
+    // reservation (it will always fail) - check LoopTraits::supports_childwatch_reservation.
+    void reserveWatch(EventLoop &eloop)
     {
-        eloop->reserveChildWatch();
+        eloop.reserveChildWatch(this);
+    }
+    
+    void unreserve(EventLoop &eloop)
+    {
+        eloop.unreserve(this);
     }
     
     // Register a watcher for the given child process with an event loop.
     // Registration can fail with std::bad_alloc.
-    void registerWith(EventLoop *eloop, pid_t child)
+    // Note that in multi-threaded programs, use of this function may be prone to a
+    // race condition such that the child terminates before the watcher is registered.
+    void addWatch(EventLoop &eloop, pid_t child)
     {
         BaseWatcher::init();
         this->watch_pid = child;
-        eloop->registerChild(this, child);
+        eloop.registerChild(this, child);
     }
     
     // Register a watcher for the given child process with an event loop,
     // after having reserved resources previously (using reserveWith).
     // Registration cannot fail.
-    void registerReserved(EventLoop *eloop, pid_t child) noexcept
+    // Note that in multi-threaded programs, use of this function may be prone to a
+    // race condition such that the child terminates before the watcher is registered.
+    void addReserved(EventLoop &eloop, pid_t child) noexcept
     {
         BaseWatcher::init();
-        eloop->registerReservedChild(this, child);
+        eloop.registerReservedChild(this, child);
+    }
+    
+    void deregister(EventLoop &eloop, pid_t child) noexcept
+    {
+        eloop.deregister(this, child);
+    }
+    
+    // Fork and watch the child with this watcher on the given event loop.
+    // If resource limitations prevent the child process from being watched, it is
+    // terminated immediately (or if the implementation allows, never started),
+    // and a suitable std::system_error or std::bad_alloc exception is thrown.
+    // Returns:
+    // - the child pid in the parent
+    // - 0 in the child
+    pid_t fork(EventLoop &eloop)
+    {
+        if (EventLoop::LoopTraits::supports_childwatch_reservation) {
+            // Reserve a watch, fork, then claim reservation
+            reserveWatch(eloop);
+            
+            auto &lock = eloop.getBaseLock();
+            lock.lock();
+            
+            pid_t child = ::fork();
+            if (child == -1) {
+                // Unreserve watch.
+                lock.unlock();
+                unreserve(eloop);
+                throw std::system_error(errno, std::system_category());
+            }
+            
+            if (child == 0) {
+                // I am the child
+                lock.unlock(); // may not really be necessary
+                return 0;
+            }
+            
+            // Register this watcher.
+            eloop.registerReservedChild_nolock(this, child);
+            lock.unlock();
+            return child;
+        }
+        else {
+            int pipefds[2];
+            if (pipe2(pipefds, O_CLOEXEC) == -1) {
+                throw std::system_error(errno, std::system_category());
+            }
+            
+            std::lock_guard<T_Mutex> guard(eloop.getBaseLock());
+            
+            pid_t child = ::fork();
+            if (child == -1) {
+                throw std::system_error(errno, std::system_category());
+            }
+            
+            if (child == 0) {
+                // I am the child
+                
+                // Wait for message from parent before continuing:
+                int rr;
+                int r = read(pipefds[0], &rr, sizeof(rr));
+                while (r == -1 && errno == EINTR) {
+                    read(pipefds[0], &rr, sizeof(rr));
+                }
+                
+                if (r == -1) _exit(0);
+                
+                close(pipefds[0]);
+                return 0;
+            }
+            
+            close(pipefds[0]); // close read end
+            
+            // Register this watcher.
+            try {
+                eloop.registerChild(this, child);
+                
+                // Continue in child (it doesn't matter what is written):
+                write(pipefds[1], &pipefds, sizeof(int));
+                close(pipefds[1]);
+                
+                return child;
+            }
+            catch (...) {
+                close(pipefds[1]);
+                throw;
+            }
+        }
     }
     
-    // virtual void gotTermStat(EventLoop<T_Mutex> *, pid_t child, int status) = 0;
+    // virtual Rearm childStatus(EventLoop &, pid_t child, int status) = 0;
 };
 
 }  // namespace dasynq::dprivate
index 43062f0f15a400ca938c7d188be6b9f9d5dcc28b..292e7805111b5e717c5effd252b64936e9e976aa 100644 (file)
@@ -50,7 +50,7 @@ class BufferedLogStream : public EventLoop_t::FdWatcher
         release = false;
     }
     
-    Rearm gotEvent(EventLoop_t *loop, int fd, int flags) noexcept override;
+    Rearm fdEvent(EventLoop_t &loop, int fd, int flags) noexcept override;
 
     // Check whether the console can be released.
     void flushForRelease();
@@ -63,7 +63,7 @@ class BufferedLogStream : public EventLoop_t::FdWatcher
         bool was_first = current_index == 0;
         current_index = log_buffer.get_length();
         if (was_first && ! release) {
-            setEnabled(&eventLoop, true);
+            setEnabled(eventLoop, true);
         }
     }
     
@@ -106,15 +106,15 @@ void BufferedLogStream::flushForRelease()
     
     // Try to flush any messages that are currently buffered. (Console is non-blocking
     // so it will fail gracefully).
-    if (gotEvent(&eventLoop, fd, OUT_EVENTS) == Rearm::DISARM) {
+    if (fdEvent(eventLoop, fd, OUT_EVENTS) == Rearm::DISARM) {
         // Console has already been released at this point.
-        setEnabled(&eventLoop, false);
+        setEnabled(eventLoop, false);
     }
-    // gotEvent didn't want to disarm, so must be partway through a message; will
+    // fdEvent didn't want to disarm, so must be partway through a message; will
     // release when it's finished.
 }
 
-Rearm BufferedLogStream::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept
+Rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
 {
     auto &log_stream = *this;
     
@@ -223,7 +223,7 @@ Rearm BufferedLogStream::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept
 void init_log(ServiceSet *sset)
 {
     service_set = sset;
-    log_stream[DLOG_CONS].registerWith(&eventLoop, STDOUT_FILENO, OUT_EVENTS); // TODO register in disabled state
+    log_stream[DLOG_CONS].addWatch(eventLoop, STDOUT_FILENO, OUT_EVENTS); // TODO register in disabled state
     enable_console_log(true);
 }
 
@@ -232,7 +232,7 @@ void init_log(ServiceSet *sset)
 void setup_main_log(int fd)
 {
     log_stream[DLOG_MAIN].init(fd);
-    log_stream[DLOG_MAIN].registerWith(&eventLoop, fd, OUT_EVENTS);
+    log_stream[DLOG_MAIN].addWatch(eventLoop, fd, OUT_EVENTS);
 }
 
 bool is_log_flushed() noexcept
@@ -253,7 +253,7 @@ void enable_console_log(bool enable) noexcept
         fcntl(1, F_SETFL, flags | O_NONBLOCK);
         // Activate watcher:
         log_stream[DLOG_CONS].init(STDOUT_FILENO);
-        log_stream[DLOG_CONS].setEnabled(&eventLoop, true);
+        log_stream[DLOG_CONS].setEnabled(eventLoop, true);
     }
     else if (! enable && log_to_console) {
         log_stream[DLOG_CONS].flushForRelease();
index 7027cca9f160693c67be4473f504f06519cb25ee..5fd99d5630f7cff52a5279ca95aa2c7d61f867e9 100644 (file)
@@ -75,9 +75,9 @@ static void control_socket_cb(EventLoop_t *loop, int fd);
 
 class ControlSocketWatcher : public EventLoop_t::FdWatcher
 {
-    Rearm gotEvent(EventLoop_t * loop, int fd, int flags)
+    Rearm fdEvent(EventLoop_t &loop, int fd, int flags) override
     {
-        control_socket_cb(loop, fd);
+        control_socket_cb(&loop, fd);
         return Rearm::REARM;
     }
     
@@ -85,10 +85,10 @@ class ControlSocketWatcher : public EventLoop_t::FdWatcher
     // TODO the fd is already stored, must we really store it again...
     int fd;
     
-    void registerWith(EventLoop_t * loop, int fd, int flags)
+    void addWatch(EventLoop_t &loop, int fd, int flags)
     {
         this->fd = fd;
-        EventLoop_t::FdWatcher::registerWith(loop, fd, flags);
+        EventLoop_t::FdWatcher::addWatch(loop, fd, flags);
     }
 };
 
@@ -150,7 +150,7 @@ namespace {
             this->cb_func = cb_func;
         }
         
-        Rearm gotSignal(EventLoop_t * eloop, int signo, SigInfo_p siginfo) override
+        Rearm received(EventLoop_t &eloop, int signo, SigInfo_p siginfo) override
         {
             service_set->stop_all_services(ShutdownType::REBOOT);
             return Rearm::REARM;
@@ -159,9 +159,9 @@ namespace {
 
     class ControlSocketWatcher : public EventLoop_t::FdWatcher
     {
-        Rearm gotEvent(EventLoop_t * loop, int fd, int flags)
+        Rearm fdEvent(EventLoop_t &loop, int fd, int flags)
         {
-            control_socket_cb(loop, fd);
+            control_socket_cb(&loop, fd);
             return Rearm::REARM;
         }
     };
@@ -319,9 +319,9 @@ int main(int argc, char **argv)
     
     auto sigterm_watcher = CallbackSignalHandler(sigterm_cb);
     
-    sigint_watcher.registerWatch(&eventLoop, SIGINT);
-    sigquit_watcher.registerWatch(&eventLoop, SIGQUIT);
-    sigterm_watcher.registerWatch(&eventLoop, SIGTERM);
+    sigint_watcher.addWatch(eventLoop, SIGINT);
+    sigquit_watcher.addWatch(eventLoop, SIGQUIT);
+    sigterm_watcher.addWatch(eventLoop, SIGTERM);
 
     // Try to open control socket (may fail due to readonly filesystem)
     open_control_socket(&eventLoop);
@@ -505,7 +505,7 @@ static void open_control_socket(EventLoop_t *loop) noexcept
         }
 
         control_socket_open = true;
-        control_socket_io.registerWith(&eventLoop, sockfd, IN_EVENTS);
+        control_socket_io.addWatch(eventLoop, sockfd, IN_EVENTS);
     }
 }
 
@@ -513,7 +513,7 @@ static void close_control_socket(EventLoop_t *loop) noexcept
 {
     if (control_socket_open) {
         int fd = control_socket_io.fd;
-        control_socket_io.deregisterWatch(&eventLoop);
+        control_socket_io.deregister(*loop);
         close(fd);
         
         // Unlink the socket:
index 92328a26e580a03765459c43535aed5d579b2614..770f8759429d8ab8a2e12bbe0ae4a9393317c5b3 100644 (file)
@@ -101,7 +101,7 @@ void ServiceRecord::stopped() noexcept
     }
 }
 
-void ServiceChildWatcher::gotTermStat(EventLoop_t * loop, pid_t child, int status) noexcept
+dasynq::Rearm ServiceChildWatcher::childStatus(EventLoop_t &loop, pid_t child, int status) noexcept
 {
     ServiceRecord *sr = service;
     
@@ -116,10 +116,14 @@ void ServiceChildWatcher::gotTermStat(EventLoop_t * loop, pid_t child, int statu
     if (sr->waiting_for_execstat) {
         // We still don't have an exec() status from the forked child, wait for that
         // before doing any further processing.
-        return;
+        return Rearm::REMOVE;
     }
     
-    sr->handle_exit_status();    
+    // Must deregister now since handle_exit_status might result in re-launch:
+    deregister(loop, child);
+    
+    sr->handle_exit_status();
+    return Rearm::REMOVED;
 }
 
 bool ServiceRecord::do_auto_restart() noexcept
@@ -219,9 +223,9 @@ void ServiceRecord::handle_exit_status() noexcept
     }
 }
 
-Rearm ServiceIoWatcher::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept
+Rearm ServiceIoWatcher::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
 {
-    ServiceRecord::process_child_status(loop, this, flags);
+    ServiceRecord::process_child_status(&loop, this, flags);
     return Rearm::REMOVED;
 }
 
@@ -234,7 +238,7 @@ void ServiceRecord::process_child_status(EventLoop_t *loop, ServiceIoWatcher * s
     int exec_status;
     int r = read(stat_io->fd, &exec_status, sizeof(int));
     close(stat_io->fd);
-    stat_io->deregisterWatch(loop);
+    stat_io->deregister(*loop);
     
     if (r > 0) {
         // We read an errno code; exec() failed, and the service startup failed.
@@ -528,7 +532,7 @@ bool ServiceRecord::read_pid_file() noexcept
             pidbuf[r] = 0; // store nul terminator
             pid = std::atoi(pidbuf);
             if (kill(pid, 0) == 0) {                
-                child_listener.registerWith(&eventLoop, pid);
+                child_listener.addWatch(eventLoop, pid);
             }
             else {
                 log(LogLevel::ERROR, service_name, ": pid read from pidfile (", pid, ") is not valid");
@@ -640,10 +644,20 @@ bool ServiceRecord::start_ps_process(const std::vector<const char *> &cmd, bool
 
     // TODO make sure pipefd's are not 0/1/2 (STDIN/OUT/ERR) - if they are, dup them
     // until they are not.
-
-    pid_t forkpid = fork();
-    if (forkpid == -1) {
-        // TODO log error
+    
+    pid_t forkpid;
+    
+    bool child_status_registered = false;    
+    try {
+        child_status_listener.addWatch(eventLoop, pipefd[0], IN_EVENTS);
+        child_status_registered = true;
+        
+        forkpid = child_listener.fork(eventLoop);
+    }
+    catch (...) {
+        if (child_status_registered) {
+            child_status_listener.deregister(eventLoop);
+        }
         close(pipefd[0]);
         close(pipefd[1]);
         return false;
@@ -721,17 +735,8 @@ bool ServiceRecord::start_ps_process(const std::vector<const char *> &cmd, bool
     else {
         // Parent process
         close(pipefd[1]); // close the 'other end' fd
-
         pid = forkpid;
 
-        // Listen for status
-        // TODO should set this up earlier so we can handle failure case (exception)
-        child_status_listener.registerWith(&eventLoop, pipefd[0], IN_EVENTS);
-
-        // Add a process listener so we can detect when the
-        // service stops
-        // TODO should reserve listener, handle exceptions
-        child_listener.registerWith(&eventLoop, pid);
         waiting_for_execstat = true;
         return true;
     }
index 27c6a37deefd20a9e2edb9ad79fcd58c09679ca7..a45471dac01f76d3f400890430a49dd3cfd1b559 100644 (file)
@@ -185,7 +185,7 @@ class ServiceChildWatcher : public EventLoop_t::ChildProcWatcher
     public:
     // TODO resolve clunkiness of storing this field
     ServiceRecord * service;
-    void gotTermStat(EventLoop_t * eloop, pid_t child, int status) noexcept;
+    Rearm childStatus(EventLoop_t &eloop, pid_t child, int status) noexcept;
     
     ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { }
 };
@@ -196,14 +196,14 @@ class ServiceIoWatcher : public EventLoop_t::FdWatcher
     // TODO resolve clunkiness of storing these fields
     int fd;
     ServiceRecord * service;
-    Rearm gotEvent(EventLoop_t * eloop, int fd, int flags) noexcept;
+    Rearm fdEvent(EventLoop_t &eloop, int fd, int flags) noexcept;
     
     ServiceIoWatcher(ServiceRecord * sr) noexcept : service(sr) { }
     
-    void registerWith(EventLoop_t *loop, int fd, int flags)
+    void addWatch(EventLoop_t &loop, int fd, int flags)
     {
         this->fd = fd;
-        EventLoop_t::FdWatcher::registerWith(loop, fd, flags);
+        EventLoop_t::FdWatcher::addWatch(loop, fd, flags);
     }
 };