Incorporate Dasynq changes
authorDavin McCall <davmac@davmac.org>
Thu, 1 Jun 2017 07:52:22 +0000 (08:52 +0100)
committerDavin McCall <davmac@davmac.org>
Thu, 1 Jun 2017 07:52:22 +0000 (08:52 +0100)
src/dasynq/dasynq-childproc.h
src/dasynq/dasynq-config.h
src/dasynq/dasynq-epoll.h
src/dasynq/dasynq-itimer.h
src/dasynq/dasynq-kqueue.h
src/dasynq/dasynq-posixtimer.h [new file with mode: 0644]
src/dasynq/dasynq-timerfd.h
src/dasynq/dasynq.h

index 62e5f636d2e3e3ac2e49f9cf1dbafa276e40ddb3..de663dbaaa6119b57e7d39dba8842ba179b4341d 100644 (file)
@@ -101,7 +101,7 @@ template <class Base> class ChildProcEvents : public Base
     using SigInfo = typename Base::SigInfo;
     
     template <typename T>
-    bool receiveSignal(T & loop_mech, SigInfo &siginfo, void *userdata)
+    bool receive_signal(T & loop_mech, SigInfo &siginfo, void *userdata)
     {
         if (siginfo.get_signo() == SIGCHLD) {
             int status;
@@ -115,7 +115,7 @@ template <class Base> class ChildProcEvents : public Base
             return false; // leave signal watch enabled
         }
         else {
-            return Base::receiveSignal(loop_mech, siginfo, userdata);
+            return Base::receive_signal(loop_mech, siginfo, userdata);
         }
     }
     
index 0ef66cd6369f6e9ed99181ad80e856aa241baa95..6c1cbcae1d60ae60600643dadfb01df3ea6af796 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef DASYNQ_CONFIG_H_INCLUDED
 #define DASYNQ_CONFIG_H_INCLUDED
 
-#if defined(__OpenBSD__) || defined(__APPLE__)
+#if defined(__OpenBSD__) || defined(__APPLE__) || defined(__FreeBSD__)
 #define DASYNQ_HAVE_KQUEUE 1
 #endif
 
index c66e85e4ed8be1b80aec4941ed9e7d28a4fa03cd..cdc70466e2e626fe86fab002d7553e9135d46a9e 100644 (file)
@@ -78,7 +78,7 @@ template <class Base> class EpollLoop : public Base
     // Base contains:
     //   lock - a lock that can be used to protect internal structure.
     //          receive*() methods will be called with lock held.
-    //   receiveSignal(SigInfo &, user *) noexcept
+    //   receive_signal(SigInfo &, user *) noexcept
     //   receiveFdEvent(FD_r, user *, int flags) noexcept
     
     using SigInfo = EpollTraits::SigInfo;
@@ -100,7 +100,7 @@ template <class Base> class EpollLoop : public Base
                     auto iter = sigdataMap.find(siginfo.get_signo());
                     if (iter != sigdataMap.end()) {
                         void *userdata = (*iter).second;
-                        if (Base::receiveSignal(*this, siginfo, userdata)) {
+                        if (Base::receive_signal(*this, siginfo, userdata)) {
                             sigdelset(&sigmask, siginfo.get_signo());
                         }
                     }
@@ -143,8 +143,14 @@ template <class Base> class EpollLoop : public Base
         }
     }
     
-    // flags:  IN_EVENTS | OUT_EVENTS | ONE_SHOT
-    void addFdWatch(int fd, void *userdata, int flags, bool enabled = true)
+    //        fd:  file descriptor to watch
+    //  userdata:  data to associate with descriptor
+    //     flags:  IN_EVENTS | OUT_EVENTS | ONE_SHOT
+    // soft_fail:  true if unsupported file descriptors should fail by returning false instead
+    //             of throwing an exception
+    // returns: true on success; false if file descriptor type isn't supported and soft_fail == true
+    // throws:  std::system_error or std::bad_alloc on failure
+    bool addFdWatch(int fd, void *userdata, int flags, bool enabled = true, bool soft_fail = false)
     {
         struct epoll_event epevent;
         // epevent.data.fd = fd;
@@ -162,11 +168,15 @@ template <class Base> class EpollLoop : public Base
         }
 
         if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) {
+            if (soft_fail && errno == EPERM) {
+                return false;
+            }
             throw new std::system_error(errno, std::system_category());        
         }
+        return true;
     }
     
-    void addBidiFdWatch(int fd, void *userdata, int flags)
+    bool addBidiFdWatch(int fd, void *userdata, int flags, bool emulate)
     {
         // No implementation.
         throw std::system_error(std::make_error_code(std::errc::not_supported));
@@ -309,25 +319,10 @@ template <class Base> class EpollLoop : public Base
             return;
         }
     
-        processEvents(events, r);
-    }
-
-    // If events are pending, process one of them.
-    // If no events are pending, wait until one event is received and
-    // process this event.
-    //
-    //  do_wait - if false, returns immediately if no events are
-    //            pending.    
-    void pullOneEvent(bool do_wait)
-    {
-        epoll_event events[1];
-        int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0);
-        if (r == -1 || r == 0) {
-            // signal or no events
-            return;
-        }
-    
-        processEvents(events, r);    
+        do {
+            processEvents(events, r);
+            r = epoll_wait(epfd, events, 16, 0);
+        } while (r > 0);
     }
 };
 
index 4f1497843076b749a6128952782b5389dcb57c9a..4045097ea6877a32a7bb518e5ef531da78c2aad8 100644 (file)
@@ -8,14 +8,27 @@
 
 namespace dasynq {
 
-// Timer implementation based on the (basically obselete) POSIX itimer interface.
+// Timer implementation based on the (basically obsolete) POSIX itimer interface.
 
 template <class Base> class ITimerEvents : public timer_base<Base>
 {
     private:
-    int timerfd_fd = -1;
-
     timer_queue_t timer_queue;
+
+#if defined(CLOCK_MONOTONIC)
+    static inline void get_curtime(struct timespec &curtime)
+    {
+        clock_gettime(CLOCK_MONOTONIC, &curtime);
+    }
+#else
+    static inline void get_curtime(struct timespec &curtime)
+    {
+        struct timeval curtime_tv;
+        gettimeofday(&curtime_tv, nullptr);
+        curtime.tv_sec = curtime_tv.tv_sec;
+        curtime.tv_nsec = curtime_tv.tv_usec * 1000;
+    }
+#endif
     
     // Set the timerfd timeout to match the first timer in the queue (disable the timerfd
     // if there are no active timers).
@@ -33,14 +46,7 @@ template <class Base> class ITimerEvents : public timer_base<Base>
         newtime = timer_queue.get_root_priority();
         
         struct timespec curtime;
-#if defined(__APPLE__)
-        struct timeval curtime_tv;
-        gettimeofday(&curtime_tv, nullptr);
-        curtime.tv_sec = curtime_tv.tv_sec;
-        curtime.tv_nsec = curtime_tv.tv_usec * 1000;
-#else
-        clock_gettime(CLOCK_MONOTONIC, &curtime);
-#endif
+        get_curtime(curtime);
         newalarm.it_interval = {0, 0};
         newalarm.it_value.tv_sec = newtime.tv_sec - curtime.tv_sec;
         newalarm.it_value.tv_usec = (newtime.tv_nsec - curtime.tv_nsec) / 1000;
@@ -57,19 +63,11 @@ template <class Base> class ITimerEvents : public timer_base<Base>
     using SigInfo = typename Base::SigInfo;
 
     template <typename T>
-    bool receiveSignal(T & loop_mech, SigInfo &siginfo, void *userdata)
+    bool receive_signal(T & loop_mech, SigInfo &siginfo, void *userdata)
     {
         if (siginfo.get_signo() == SIGALRM) {
             struct timespec curtime;
-#if defined(__APPLE__)
-            struct timeval curtime_tv;
-            gettimeofday(&curtime_tv, nullptr);
-            curtime.tv_sec = curtime_tv.tv_sec;
-            curtime.tv_nsec = curtime_tv.tv_usec * 1000;
-#else
-            clock_gettime(CLOCK_MONOTONIC, &curtime);
-            // should we use the REALTIME clock instead? I have no idea :/
-#endif
+            get_curtime(curtime);
             
             timer_base<Base>::process_timer_queue(timer_queue, curtime);
 
@@ -79,7 +77,7 @@ template <class Base> class ITimerEvents : public timer_base<Base>
             return false; // don't disable signal watch
         }
         else {
-            return Base::receiveSignal(loop_mech, siginfo, userdata);
+            return Base::receive_signal(loop_mech, siginfo, userdata);
         }
     }
         
@@ -146,21 +144,14 @@ template <class Base> class ITimerEvents : public timer_base<Base>
     {
         // TODO consider caching current time somehow; need to decide then when to update cached value.
         struct timespec curtime;
-#if defined(__APPLE__)
-        struct timeval curtime_tv;
-        gettimeofday(&curtime_tv, nullptr);
-        curtime.tv_sec = curtime_tv.tv_sec;
-        curtime.tv_nsec = curtime_tv.tv_usec * 1000;
-#else
-        clock_gettime(CLOCK_MONOTONIC, &curtime);
-#endif
+        get_curtime(curtime);
         curtime.tv_sec += timeout.tv_sec;
         curtime.tv_nsec += timeout.tv_nsec;
         if (curtime.tv_nsec > 1000000000) {
             curtime.tv_nsec -= 1000000000;
             curtime.tv_sec++;
         }
-        setTimer(timer_id, curtime, interval, enable);
+        setTimer(timer_id, curtime, interval, enable, clock);
     }
     
     // Enables or disabling report of timeouts (does not stop timer)
@@ -199,6 +190,11 @@ template <class Base> class ITimerEvents : public timer_base<Base>
             }
         }
     }
+
+    void get_time(timespec &ts, clock_type clock, bool force_update) noexcept
+    {
+        get_curtime(ts);
+    }
 };
 
 }
index 6493ba857c96c94644efc72b8029e1727fd8fe63..a098a084db4db5a3f3f5a4e230aaba7f1b605beb 100644 (file)
@@ -22,6 +22,18 @@ extern "C" {
 
 #include "dasynq-config.h"
 
+// "kqueue"-based event loop mechanism.
+//
+// kqueue is available on BSDs and Mac OS X, though there are subtle differences from OS to OS.
+//
+// kqueue supports watching file descriptors (input and output as separate watches only),
+// signals, child processes, and timers. Unfortunately support for the latter two is imperfect;
+// it is not possible to reserve process watches in advance; timers can only be active, count
+// down immediately when created, and cannot be reset to another time. For timers especially
+// the problems are significant: we can't allocate timers in advance, and we can't even feasibly
+// manage our own timer queue via a single kqueue-backed timer. Therefore, an alternate timer
+// mechanism must be used together with kqueue.
+
 namespace dasynq {
 
 template <class Base> class KqueueLoop;
@@ -74,13 +86,13 @@ class KqueueTraits
     const static bool supports_childwatch_reservation = true;
 };
 
-#if defined(__OpenBSD__)
+#if defined(__OpenBSD__) && _POSIX_REALTIME_SIGNALS <= 0
 // OpenBSD has no sigtimedwait (or sigwaitinfo) but does have "__thrsigdivert", which is
 // essentially an incomplete version of the same thing. Discussion with OpenBSD developer
-// Ted Unangst suggested that the siginfo_t structure returned might not always have all
-// fields set correctly. Furthermore there is a bug such that specifying a zero timeout (or
-// indeed any timeout less than a tick) results in NO timeout. We get around this by instead
-// specifying an *invalid* timeout, which won't error out if a signal is pending.
+// Ted Unangst suggested that the siginfo_t structure returned might not always have all fields
+// set correctly. Furthermore there is a bug (at least in 5.9)  such that specifying a zero
+// timeout (or indeed any timeout less than a tick) results in NO timeout. We get around this by
+// instead specifying an *invalid* timeout, which won't error out if a signal is pending.
 static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct timespec *timeout)
 {
     // We know that we're only called with a timeout of 0 (which doesn't work properly) and
@@ -89,7 +101,9 @@ static inline int sigtimedwait(const sigset_t *ssp, siginfo_t *info, struct time
     timeout->tv_nsec = 1000000001;
     return __thrsigdivert(*ssp, info, timeout);
 }
+#endif
 
+#if defined(__OpenBSD__) || _POSIX_REALTIME_SIGNALS > 0
 static inline void prepare_signal(int signo) { }
 static inline void unprep_signal(int signo) { }
 
@@ -104,8 +118,10 @@ static bool get_siginfo(int signo, siginfo_t *siginfo)
     sigaddset(&mask, signo);
     return (sigtimedwait(&mask, siginfo, &timeout) != -1);
 }
+#else
 
-#elif defined(__APPLE__)
+// If we have no sigtimedwait implementation, we have to retrieve signal data by establishing a
+// signal handler:
 
 static siginfo_t * siginfo_p;
 
@@ -178,7 +194,7 @@ template <class Base> class KqueueLoop : public Base
             if (events[i].filter == EVFILT_SIGNAL) {
                 SigInfo siginfo;
                 if (get_siginfo(events[i].ident, &siginfo.info)
-                        && Base::receiveSignal(*this, siginfo, (void *)events[i].udata)) {
+                        && Base::receive_signal(*this, siginfo, (void *)events[i].udata)) {
                     sigdelset(&sigmask, events[i].ident);
                     events[i].flags = EV_DISABLE;
                 }
@@ -241,33 +257,103 @@ template <class Base> class KqueueLoop : public Base
         kevent(kqfd, &kev, 1, nullptr, 0, nullptr);    
     }
     
-    // flags:  IN_EVENTS | OUT_EVENTS
-    void addFdWatch(int fd, void *userdata, int flags, bool enabled = true)
+    //        fd:  file descriptor to watch
+    //  userdata:  data to associate with descriptor
+    //     flags:  IN_EVENTS | OUT_EVENTS | ONE_SHOT
+    //             (only one of IN_EVENTS/OUT_EVENTS can be specified)
+    // soft_fail:  true if unsupported file descriptors should fail by returning false instead
+    //             of throwing an exception
+    // returns: true on success; false if file descriptor type isn't supported and soft_fail == true
+    // throws:  std::system_error or std::bad_alloc on failure
+    bool addFdWatch(int fd, void *userdata, int flags, bool enabled = true, bool emulate = false)
     {
-        // TODO kqueue doesn't support EVFILT_WRITE on file fd's :/
-        // Presumably they cause the kevent call to fail. We could maintain
-        // a separate set and use poll() (urgh).
-        
         short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE;
-        
+
         struct kevent kev;
         EV_SET(&kev, fd, filter, EV_ADD | (enabled ? 0 : EV_DISABLE), 0, 0, userdata);
         if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) {
+            // Note that kqueue supports EVFILT_READ on regular file fd's, but not EVFIL_WRITE.
+            if (filter == EVFILT_WRITE && errno == EINVAL) {
+                return false; // emulate
+            }
             throw new std::system_error(errno, std::system_category());
         }
+        return true;
     }
-    
-    void addBidiFdWatch(int fd, void *userdata, int flags)
+
+    // returns: 0 on success
+    //          IN_EVENTS  if in watch requires emulation
+    //          OUT_EVENTS if out watch requires emulation
+    int addBidiFdWatch(int fd, void *userdata, int flags, bool emulate = false)
     {
+#ifdef EV_RECEIPT
         struct kevent kev[2];
+        struct kevent kev_r[2];
+        short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
+        short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
+        EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata);
+        EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
+
+        int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr);
+
+        if (r == -1) {
+            throw new std::system_error(errno, std::system_category());
+        }
+
+        // Some possibilities:
+        // - both ends failed. We'll throw an error rather than allowing emulation.
+        // - read watch failed, write succeeded : should not happen.
+        // - read watch added, write failed: if emulate == true, succeed;
+        //                                   if emulate == false, remove read and fail.
+
+        if (kev_r[0].data != 0) {
+            // read failed
+            throw new std::system_error(kev_r[0].data, std::system_category());
+        }
+
+        if (kev_r[1].data != 0) {
+            if (emulate) {
+                return OUT_EVENTS;
+            }
+            // remove read watch
+            EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+            kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+            // throw exception
+            throw new std::system_error(kev_r[1].data, std::system_category());
+        }
+
+        return 0;
+#else
+        // OpenBSD doesn't have EV_RECEIPT: install the watches one at a time
+        struct kevent kev[1];
+
         short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE);
         short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE);
         EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata);
-        EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
-        
-        if (kevent(kqfd, kev, 2, nullptr, 0, nullptr) == -1) {
+
+        int r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+
+        if (r == -1) {
             throw new std::system_error(errno, std::system_category());
-        }        
+        }
+
+        EV_SET(&kev[0], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
+
+        r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+
+        if (r == -1) {
+            if (emulate) {
+                return OUT_EVENTS;
+            }
+            // remove read watch
+            EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+            kevent(kqfd, kev, 1, nullptr, 0, nullptr);
+            // throw exception
+            throw new std::system_error(errno, std::system_category());
+        }
+
+        return 0;
+#endif
     }
     
     // flags specifies which watch to remove; ignored if the loop doesn't support
@@ -323,7 +409,7 @@ template <class Base> class KqueueLoop : public Base
 
         struct kevent evt;
         EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD, 0, 0, userdata);
-        // TODO use EV_DISPATCH if available (not on OpenBSD)
+        // TODO use EV_DISPATCH if available (not on OpenBSD/OS X)
         
         if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) {
             throw new std::system_error(errno, std::system_category());
@@ -359,31 +445,22 @@ template <class Base> class KqueueLoop : public Base
         removeSignalWatch_nolock(signo);
     }
     
-    // If events are pending, process an unspecified number of them.
-    // If no events are pending, wait until one event is received and
-    // process this event (and possibly any other events received
-    // simultaneously).
-    // If processing an event removes a watch, there is a possibility
-    // that the watched event will still be reported (if it has
-    // occurred) before pullEvents() returns.
+    private:
+
+    // We actually need to check pending signals before polling the kqueue, since kqueue can
+    // count signals as they are delivered but the count is cleared when we poll the kqueue,
+    // meaning that signals might still be pending if they were queued multiple times at the
+    // last poll (since we report only one signal delivery at a time and the watch is
+    // automatically disabled each time).
     //
-    //  do_wait - if false, returns immediately if no events are
-    //            pending.
-    void pullEvents(bool do_wait)
+    // The check is not necessary on systems that don't queue signals.
+void pull_signals()
     {
-        // We actually need to check pending signals, since
-        // kqueue can count signals as they are delivered but the count is
-        // cleared when we poll the kqueue, meaning that signals might still
-        // be pending if they were queued multiple times at the last poll.
-        
+#if _POSIX_REALTIME_SIGNALS > 0
         // TODO we should only poll for signals that *have* been reported
         // as being raised more than once prior via kevent, rather than all
-        // signals that have been registered - in many cases that will allow
+        // signals that have been registered - in many cases that may allow
         // us to skip the sigtimedwait call altogether.
-        
-        // The check is not necessary on systems that don't queue signals.
-
-#if _POSIX_REALTIME_SIGNALS > 0
         {
             std::lock_guard<decltype(Base::lock)> guard(Base::lock);
 
@@ -397,46 +474,44 @@ template <class Base> class KqueueLoop : public Base
                     sigdelset(&sigmask, rsigno);
                     // TODO accumulate and disable multiple filters with a single kevents call
                     //      rather than disabling each individually
-                    setFilterEnabled(EVFILT_SIGNAL, rsigno, false);
+                    setFilterEnabled(EVFILT_SIGNAL, rsigno, sigdataMap[rsigno], false);
                 }
                 rsigno = sigtimedwait(&sigmask, &siginfo.info, &timeout);
             }
         }
 #endif
-        
-        struct kevent events[16];
-        struct timespec ts;
-        ts.tv_sec = 0;
-        ts.tv_nsec = 0;
-        int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
-        if (r == -1 || r == 0) {
-            // signal or no events
-            return;
-        }
-        
-        processEvents(events, r);
     }
 
-    // If events are pending, process one of them.
+    public:
+
+    // If events are pending, process an unspecified number of them.
     // If no events are pending, wait until one event is received and
-    // process this event.
+    // process this event (and possibly any other events received
+    // simultaneously).
+    // If processing an event removes a watch, there is a possibility
+    // that the watched event will still be reported (if it has
+    // occurred) before pullEvents() returns.
     //
     //  do_wait - if false, returns immediately if no events are
-    //            pending.    
-    void pullOneEvent(bool do_wait)
+    //            pending.
+    void pullEvents(bool do_wait)
     {
-        // TODO must check for pending signals as per pullEvents()
-        struct kevent events[1];
+        pull_signals();
+        
+        struct kevent events[16];
         struct timespec ts;
         ts.tv_sec = 0;
         ts.tv_nsec = 0;
-        int r = kevent(kqfd, nullptr, 0, events, 1, do_wait ? nullptr : &ts);
+        int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
         if (r == -1 || r == 0) {
             // signal or no events
             return;
         }
-    
-        processEvents(events, r);
+        
+        do {
+            processEvents(events, r);
+            r = kevent(kqfd, nullptr, 0, events, 16, &ts);
+        } while (r > 0);
     }
 };
 
diff --git a/src/dasynq/dasynq-posixtimer.h b/src/dasynq/dasynq-posixtimer.h
new file mode 100644 (file)
index 0000000..8131489
--- /dev/null
@@ -0,0 +1,248 @@
+#include <vector>
+#include <utility>
+
+#include <sys/time.h>
+#include <time.h>
+#include <signal.h>
+
+#include "dasynq-config.h"
+#include "dasynq-timerbase.h"
+
+namespace dasynq {
+
+// Timer implementation based on POSIX create_timer et al.
+// May require linking with -lrt
+
+template <class Base> class PosixTimerEvents : public timer_base<Base>
+{
+    private:
+    timer_queue_t real_timer_queue;
+    timer_queue_t mono_timer_queue;
+
+    timer_t real_timer;
+    timer_t mono_timer;
+
+    // Set the timeout to match the first timer in the queue (disable the timer if there are no
+    // active timers).
+    void set_timer_from_queue(timer_t &timer, timer_queue_t &timer_queue)
+    {
+        struct itimerspec newalarm;
+
+        if (timer_queue.empty()) {
+            newalarm.it_value = {0, 0};
+            newalarm.it_interval = {0, 0};
+            timer_settime(timer, TIMER_ABSTIME, &newalarm, nullptr);
+            return;
+        }
+
+        newalarm.it_interval = {0, 0};
+        newalarm.it_value = timer_queue.get_root_priority();
+        timer_settime(timer, TIMER_ABSTIME, &newalarm, nullptr);
+    }
+
+    protected:
+
+    using SigInfo = typename Base::SigInfo;
+
+    template <typename T>
+    bool receive_signal(T & loop_mech, SigInfo &siginfo, void *userdata)
+    {
+        if (siginfo.get_signo() == SIGALRM) {
+            struct timespec curtime;
+
+            if (! real_timer_queue.empty()) {
+                clock_gettime(CLOCK_REALTIME, &curtime);
+                timer_base<Base>::process_timer_queue(real_timer_queue, curtime);
+                set_timer_from_queue(real_timer, real_timer_queue);
+            }
+
+            if (! mono_timer_queue.empty()) {
+                clock_gettime(CLOCK_MONOTONIC, &curtime);
+                timer_base<Base>::process_timer_queue(mono_timer_queue, curtime);
+                set_timer_from_queue(mono_timer, mono_timer_queue);
+            }
+
+            // loop_mech.rearmSignalWatch_nolock(SIGALRM);
+            return false; // don't disable signal watch
+        }
+        else {
+            return Base::receive_signal(loop_mech, siginfo, userdata);
+        }
+    }
+
+    timer_queue_t &queue_for_clock(clock_type clock)
+    {
+        switch (clock) {
+        case clock_type::MONOTONIC:
+            return mono_timer_queue;
+        case clock_type::SYSTEM:
+            return real_timer_queue;
+        default:
+            DASYNQ_UNREACHABLE;
+        }
+    }
+
+    timer_t &timer_for_clock(clock_type clock)
+    {
+        switch (clock) {
+        case clock_type::MONOTONIC:
+            return mono_timer;
+        case clock_type::SYSTEM:
+            return real_timer;
+        default:
+            DASYNQ_UNREACHABLE;
+        }
+    }
+
+    public:
+
+    template <typename T> void init(T *loop_mech)
+    {
+        sigset_t sigmask;
+        sigprocmask(SIG_UNBLOCK, nullptr, &sigmask);
+        sigaddset(&sigmask, SIGALRM);
+        sigprocmask(SIG_SETMASK, &sigmask, nullptr);
+        loop_mech->addSignalWatch(SIGALRM, nullptr);
+
+        struct sigevent timer_sigevent;
+        timer_sigevent.sigev_notify = SIGEV_SIGNAL;
+        timer_sigevent.sigev_signo = SIGALRM;
+        timer_sigevent.sigev_value.sival_int = 0;
+
+        // Create the timers; throw std::system_error if we can't.
+        if (timer_create(CLOCK_REALTIME, &timer_sigevent, &real_timer) == 0) {
+            if (timer_create(CLOCK_MONOTONIC, &timer_sigevent, &mono_timer) != 0) {
+                timer_delete(real_timer);
+                throw std::system_error(errno, std::system_category());
+            }
+        }
+        else {
+            throw std::system_error(errno, std::system_category());
+        }
+
+        Base::init(loop_mech);
+    }
+
+    void addTimer(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC)
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        queue_for_clock(clock).allocate(h, userdata);
+    }
+
+    void removeTimer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        removeTimer_nolock(timer_id, clock);
+    }
+
+    void removeTimer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+    {
+        timer_queue_t &timer_queue = queue_for_clock(clock);
+
+        if (timer_queue.is_queued(timer_id)) {
+            timer_queue.remove(timer_id);
+        }
+        timer_queue.deallocate(timer_id);
+    }
+
+    // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0.
+    //   enable: specifies whether to enable reporting of timeouts/intervals
+    void setTimer(timer_handle_t &timer_id, struct timespec &timeout, struct timespec &interval,
+            bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
+        timer_queue_t &timer_queue = queue_for_clock(clock);
+        timer_t &timer = timer_for_clock(clock);
+
+        auto &ts = timer_queue.node_data(timer_id);
+        ts.interval_time = interval;
+        ts.expiry_count = 0;
+        ts.enabled = enable;
+
+        if (timer_queue.is_queued(timer_id)) {
+            // Already queued; alter timeout
+            if (timer_queue.set_priority(timer_id, timeout)) {
+                set_timer_from_queue(timer, timer_queue);
+            }
+        }
+        else {
+            if (timer_queue.insert(timer_id, timeout)) {
+                set_timer_from_queue(timer, timer_queue);
+            }
+        }
+    }
+
+    // Set timer relative to current time:
+    void setTimerRel(timer_handle_t &timer_id, struct timespec &timeout, struct timespec &interval,
+            bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
+    {
+        // TODO consider caching current time somehow; need to decide then when to update cached value.
+        struct timespec curtime;
+        int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME;
+        clock_gettime(posix_clock_id, &curtime);
+        curtime.tv_sec += timeout.tv_sec;
+        curtime.tv_nsec += timeout.tv_nsec;
+        if (curtime.tv_nsec > 1000000000) {
+            curtime.tv_nsec -= 1000000000;
+            curtime.tv_sec++;
+        }
+        setTimer(timer_id, curtime, interval, enable, clock);
+    }
+
+    // Enables or disabling report of timeouts (does not stop timer)
+    void enableTimer(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        enableTimer_nolock(timer_id, enable, clock);
+    }
+
+    void enableTimer_nolock(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
+    {
+        timer_queue_t &timer_queue = queue_for_clock(clock);
+
+        auto &node_data = timer_queue.node_data(timer_id);
+        auto expiry_count = node_data.expiry_count;
+        if (expiry_count != 0) {
+            node_data.expiry_count = 0;
+            Base::receiveTimerExpiry(timer_id, node_data.userdata, expiry_count);
+        }
+        else {
+            timer_queue.node_data(timer_id).enabled = enable;
+        }
+    }
+
+    void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+    {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+        stop_timer_nolock(timer_id, clock);
+    }
+
+    void stop_timer_nolock(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
+    {
+        timer_queue_t &timer_queue = queue_for_clock(clock);
+        timer_t &timer = timer_for_clock(clock);
+
+        if (timer_queue.is_queued(timer_id)) {
+            bool was_first = (&timer_queue.get_root()) == &timer_id;
+            timer_queue.remove(timer_id);
+            if (was_first) {
+                set_timer_from_queue(timer, timer_queue);
+            }
+        }
+    }
+
+    void get_time(timespec &ts, clock_type clock, bool force_update) noexcept
+    {
+        int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME;
+        clock_gettime(posix_clock_id, &ts);
+    }
+
+    ~PosixTimerEvents()
+    {
+        timer_delete(mono_timer);
+        timer_delete(real_timer);
+    }
+};
+
+}
index e660efe71ed02aa3ebd307ef49057e7ffeaa1c03..8e49272a8988e9f2763c0d501e9dbd209a6e3fdb 100644 (file)
@@ -241,6 +241,12 @@ template <class Base> class TimerFdEvents : public timer_base<Base>
         }
     }
 
+    void get_time(timespec &ts, clock_type clock, bool force_update) noexcept
+    {
+        int posix_clock_id = (clock == clock_type::MONOTONIC) ? CLOCK_MONOTONIC : CLOCK_REALTIME;
+        clock_gettime(posix_clock_id, &ts);
+    }
+
     ~TimerFdEvents()
     {
         close(timerfd_fd);
index 715841a9197ee86ee15c764d7e1596160b031682..836139e93ea2ba0b2a38b05f0acdc22cb401c47b 100644 (file)
 // Loop and LoopTraits defined already; used for testing
 #elif defined(DASYNQ_HAVE_KQUEUE)
 #include "dasynq-kqueue.h"
+#if _POSIX_TIMERS > 0
+#include "dasynq-posixtimer.h"
+namespace dasynq {
+    template <typename T> using TimerEvents = PosixTimerEvents<T>;
+}
+#else
 #include "dasynq-itimer.h"
+namespace dasynq {
+    template <typename T> using TimerEvents = ITimerEvents<T>;
+}
+#endif
 #include "dasynq-childproc.h"
 namespace dasynq {
-    template <typename T> using Loop = KqueueLoop<interrupt_channel<ITimerEvents<ChildProcEvents<T>>>>;
+    template <typename T> using Loop = KqueueLoop<interrupt_channel<TimerEvents<ChildProcEvents<T>>>>;
     using LoopTraits = KqueueTraits;
 }
 #elif defined(DASYNQ_HAVE_EPOLL)
@@ -69,7 +79,7 @@ namespace dasynq {
 
 namespace dasynq {
 
-#ifdef __APPLE__
+#if HAVE_PIPE2 == 0
 inline int pipe2(int filedes[2], int flags)
 {
     if (pipe(filedes) == -1) {
@@ -91,10 +101,10 @@ inline int pipe2(int filedes[2], int flags)
 #endif
 
 namespace dprivate {
-    class BaseWatcher;
+    class base_watcher;
 }
 
-using PrioQueue = NaryHeap<dprivate::BaseWatcher *, int>;
+using PrioQueue = NaryHeap<dprivate::base_watcher *, int>;
 
 inline namespace {
     constexpr int DEFAULT_PRIORITY = 50;
@@ -138,7 +148,7 @@ namespace dprivate {
     template <typename, typename> class child_proc_watcher_impl;
     template <typename, typename> class timer_impl;
 
-    enum class WatchType
+    enum class watch_type_t
     {
         SIGNAL,
         FD,
@@ -155,22 +165,25 @@ namespace dprivate {
     constexpr static int multi_watch = 4;
     
     // Represents a queued event notification. Various event watchers derive from this type.
-    class BaseWatcher
+    class base_watcher
     {
         template <typename T_Mutex, typename Traits> friend class EventDispatch;
         template <typename T_Mutex, template <typename> class, typename> friend class dasynq::event_loop;
-        friend inline void basewatcher_set_active(BaseWatcher &watcher, bool active);
-        friend inline bool basewatcher_get_deleteme(const BaseWatcher &watcher);
+        friend inline void basewatcher_set_active(base_watcher &watcher, bool active);
+        friend inline bool basewatcher_get_deleteme(const base_watcher &watcher);
+        friend inline bool basewatcher_get_emulatefd(const base_watcher &watcher);
         
         protected:
-        WatchType watchType;
+        watch_type_t watchType;
         int active : 1;    // currently executing handler?
         int deleteme : 1;  // delete when handler finished?
+        int emulatefd : 1; // emulate file watch (by re-queueing)
+        int emulate_enabled : 1;   // whether an emulated watch is enabled
         
         PrioQueue::handle_t heap_handle;
         int priority;
         
-        static void set_priority(BaseWatcher &p, int prio)
+        static void set_priority(base_watcher &p, int prio)
         {
             p.priority = prio;
         }
@@ -182,16 +195,18 @@ namespace dprivate {
         {
             active = false;
             deleteme = false;
+            emulatefd = false;
+            emulate_enabled = false;
             PrioQueue::init_handle(heap_handle);
             priority = DEFAULT_PRIORITY;
         }
         
-        BaseWatcher(WatchType wt) noexcept : watchType(wt) { }
+        base_watcher(watch_type_t wt) noexcept : watchType(wt) { }
         
         virtual void dispatch(void *loop_ptr) noexcept { };
         virtual void dispatch_second(void *loop_ptr) noexcept { }
 
-        virtual ~BaseWatcher() noexcept { }
+        virtual ~base_watcher() noexcept { }
         
         // Called when the watcher has been removed.
         // It is guaranteed by the caller that:
@@ -204,26 +219,31 @@ namespace dprivate {
         }
     };
     
-    inline void basewatcher_set_active(BaseWatcher &watcher, bool active)
+    inline void basewatcher_set_active(base_watcher &watcher, bool active)
     {
         watcher.active = active;
     }
 
-    inline bool basewatcher_get_deleteme(const BaseWatcher &watcher)
+    inline bool basewatcher_get_deleteme(const base_watcher &watcher)
     {
         return watcher.deleteme;
     }
 
+    inline bool basewatcher_get_emulatefd(const base_watcher &watcher)
+    {
+        return watcher.emulatefd;
+    }
+
     // Base signal event - not part of public API
     template <typename T_Mutex, typename Traits>
-    class BaseSignalWatcher : public BaseWatcher
+    class base_signal_watcher : public base_watcher
     {
         friend class EventDispatch<T_Mutex, Traits>;
         template <typename, template <typename> class, typename> friend class dasynq::event_loop;
 
         protected:
         typename Traits::SigInfo siginfo;
-        BaseSignalWatcher() : BaseWatcher(WatchType::SIGNAL) { }
+        base_signal_watcher() : base_watcher(watch_type_t::SIGNAL) { }
 
         public:
         using siginfo_t = typename Traits::SigInfo;
@@ -231,7 +251,7 @@ namespace dprivate {
     };
     
     template <typename T_Mutex>
-    class BaseFdWatcher : public BaseWatcher
+    class base_fd_watcher : public base_watcher
     {
         template <typename, typename Traits> friend class EventDispatch;
         template <typename, template <typename> class, typename> friend class dasynq::event_loop;
@@ -248,11 +268,11 @@ namespace dprivate {
         //              the events that the watcher is currently watching (i.e. specifies which
         //              halves of the Bidi watcher are enabled).
 
-        BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }
+        base_fd_watcher() noexcept : base_watcher(watch_type_t::FD) { }
     };
     
     template <typename T_Mutex>
-    class BaseBidiFdWatcher : public BaseFdWatcher<T_Mutex>
+    class base_bidi_fd_watcher : public base_fd_watcher<T_Mutex>
     {
         template <typename, typename Traits> friend class EventDispatch;
         template <typename, template <typename> class, typename> friend class dasynq::event_loop;
@@ -261,14 +281,14 @@ namespace dprivate {
         
         // The main instance is the "input" watcher only; we keep a secondary watcher
         // with a secondary set of flags for the "output" watcher:
-        BaseWatcher outWatcher {WatchType::SECONDARYFD};
+        base_watcher outWatcher {watch_type_t::SECONDARYFD};
         
         int read_removed : 1; // read watch removed?
         int write_removed : 1; // write watch removed?
     };
     
     template <typename T_Mutex>
-    class BaseChildWatcher : public BaseWatcher
+    class base_child_watcher : public base_watcher
     {
         template <typename, typename Traits> friend class EventDispatch;
         template <typename, template <typename> class, typename> friend class dasynq::event_loop;
@@ -277,12 +297,12 @@ namespace dprivate {
         pid_t watch_pid;
         int child_status;
         
-        BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { }
+        base_child_watcher() : base_watcher(watch_type_t::CHILD) { }
     };
     
 
     template <typename T_Mutex>
-    class BaseTimerWatcher : public BaseWatcher
+    class base_timer_watcher : public base_watcher
     {
         template <typename, typename Traits> friend class EventDispatch;
         template <typename, template <typename> class, typename> friend class dasynq::event_loop;
@@ -292,7 +312,7 @@ namespace dprivate {
         int intervals;
         clock_type clock;
 
-        BaseTimerWatcher() : BaseWatcher(WatchType::TIMER)
+        base_timer_watcher() : base_watcher(watch_type_t::TIMER)
         {
             init_timer_handle(timer_handle);
         }
@@ -424,7 +444,7 @@ namespace dprivate {
     
     // Do standard post-dispatch processing for a watcher. This handles the case of removing or
     // re-queing watchers depending on the rearm type.
-    template <typename Loop> void post_dispatch(Loop &loop, BaseWatcher *watcher, rearm rearmType)
+    template <typename Loop> void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearmType)
     {
         if (rearmType == rearm::REMOVE) {
             loop.getBaseLock().unlock();
@@ -432,7 +452,7 @@ namespace dprivate {
             loop.getBaseLock().lock();
         }
         else if (rearmType == rearm::REQUEUE) {
-            loop.requeueWatcher(watcher);
+            loop.requeue_watcher(watcher);
         }
     }
 
@@ -447,30 +467,30 @@ namespace dprivate {
         // queue data structure/pointer
         PrioQueue event_queue;
         
-        using BaseSignalWatcher = dasynq::dprivate::BaseSignalWatcher<T_Mutex,Traits>;
-        using BaseFdWatcher = dasynq::dprivate::BaseFdWatcher<T_Mutex>;
-        using BaseBidiFdWatcher = dasynq::dprivate::BaseBidiFdWatcher<T_Mutex>;
-        using BaseChildWatcher = dasynq::dprivate::BaseChildWatcher<T_Mutex>;
-        using BaseTimerWatcher = dasynq::dprivate::BaseTimerWatcher<T_Mutex>;
+        using BaseSignalWatcher = dasynq::dprivate::base_signal_watcher<T_Mutex,Traits>;
+        using BaseFdWatcher = dasynq::dprivate::base_fd_watcher<T_Mutex>;
+        using BaseBidiFdWatcher = dasynq::dprivate::base_bidi_fd_watcher<T_Mutex>;
+        using BaseChildWatcher = dasynq::dprivate::base_child_watcher<T_Mutex>;
+        using BaseTimerWatcher = dasynq::dprivate::base_timer_watcher<T_Mutex>;
         
         // Add a watcher into the queuing system (but don't queue it)
         //   may throw: std::bad_alloc
-        void prepare_watcher(BaseWatcher *bwatcher)
+        void prepare_watcher(base_watcher *bwatcher)
         {
             event_queue.allocate(bwatcher->heap_handle, bwatcher);
         }
         
-        void queueWatcher(BaseWatcher *bwatcher) noexcept
+        void queueWatcher(base_watcher *bwatcher) noexcept
         {
             event_queue.insert(bwatcher->heap_handle, bwatcher->priority);
         }
         
-        bool isQueued(BaseWatcher *bwatcher) noexcept
+        bool isQueued(base_watcher *bwatcher) noexcept
         {
             return event_queue.is_queued(bwatcher->heap_handle);
         }
 
-        void dequeueWatcher(BaseWatcher *bwatcher) noexcept
+        void dequeueWatcher(base_watcher *bwatcher) noexcept
         {
             if (event_queue.is_queued(bwatcher->heap_handle)) {
                 event_queue.remove(bwatcher->heap_handle);
@@ -478,7 +498,7 @@ namespace dprivate {
         }
 
         // Remove watcher from the queueing system
-        void release_watcher(BaseWatcher *bwatcher) noexcept
+        void release_watcher(base_watcher *bwatcher) noexcept
         {
             event_queue.deallocate(bwatcher->heap_handle);
         }
@@ -490,7 +510,7 @@ namespace dprivate {
         
         // Receive a signal; return true to disable signal watch or false to leave enabled
         template <typename T>
-        bool receiveSignal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata) noexcept
+        bool receive_signal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata) noexcept
         {
             BaseSignalWatcher * bwatcher = static_cast<BaseSignalWatcher *>(userdata);
             bwatcher->siginfo = siginfo;
@@ -505,7 +525,7 @@ namespace dprivate {
             
             bfdw->event_flags |= flags;
             
-            BaseWatcher * bwatcher = bfdw;
+            base_watcher * bwatcher = bfdw;
             
             bool is_multi_watch = bfdw->watch_flags & multi_watch;
             if (is_multi_watch) {                
@@ -551,19 +571,19 @@ namespace dprivate {
         }
         
         // Pull a single event from the queue; returns nullptr if the queue is empty.
-        BaseWatcher * pullEvent() noexcept
+        base_watcher * pullEvent() noexcept
         {
             if (event_queue.empty()) {
                 return nullptr;
             }
             
             auto & rhndl = event_queue.get_root();
-            BaseWatcher *r = event_queue.node_data(rhndl);
+            base_watcher *r = event_queue.node_data(rhndl);
             event_queue.pull_root();
             return r;
         }
         
-        void issueDelete(BaseWatcher *watcher) noexcept
+        void issueDelete(base_watcher *watcher) noexcept
         {
             // This is only called when the attention lock is held, so if the watcher is not
             // active/queued now, it cannot become active (and will not be reported with an event)
@@ -602,7 +622,7 @@ namespace dprivate {
                 watcher->read_removed = true;
             }
             
-            BaseWatcher *secondary = &(watcher->outWatcher);
+            base_watcher *secondary = &(watcher->outWatcher);
             if (secondary->active) {
                 secondary->deleteme = true;
                 release_watcher(watcher);
@@ -639,7 +659,7 @@ class event_loop
     friend class dprivate::timer<my_event_loop_t>;
     
     friend void dprivate::post_dispatch<my_event_loop_t>(my_event_loop_t &loop,
-            dprivate::BaseWatcher *watcher, rearm rearmType);
+            dprivate::base_watcher *watcher, rearm rearmType);
 
     template <typename, typename> friend class dprivate::fd_watcher_impl;
     template <typename, typename> friend class dprivate::bidi_fd_watcher_impl;
@@ -654,13 +674,13 @@ class event_loop
     template <typename T, typename U> using EventDispatch = dprivate::EventDispatch<T,U>;
     template <typename T> using waitqueue = dprivate::waitqueue<T>;
     template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
-    using BaseWatcher = dprivate::BaseWatcher;
-    using BaseSignalWatcher = dprivate::BaseSignalWatcher<T_Mutex,LoopTraits>;
-    using BaseFdWatcher = dprivate::BaseFdWatcher<T_Mutex>;
-    using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher<T_Mutex>;
-    using BaseChildWatcher = dprivate::BaseChildWatcher<T_Mutex>;
-    using BaseTimerWatcher = dprivate::BaseTimerWatcher<T_Mutex>;
-    using WatchType = dprivate::WatchType;
+    using BaseWatcher = dprivate::base_watcher;
+    using BaseSignalWatcher = dprivate::base_signal_watcher<T_Mutex,LoopTraits>;
+    using BaseFdWatcher = dprivate::base_fd_watcher<T_Mutex>;
+    using BaseBidiFdWatcher = dprivate::base_bidi_fd_watcher<T_Mutex>;
+    using BaseChildWatcher = dprivate::base_child_watcher<T_Mutex>;
+    using BaseTimerWatcher = dprivate::base_timer_watcher<T_Mutex>;
+    using watch_type_t = dprivate::watch_type_t;
     
     Loop<EventDispatch<T_Mutex, LoopTraits>> loop_mech;
 
@@ -734,19 +754,28 @@ class event_loop
         loop_mech.removeSignalWatch(signo);
         
         waitqueue_node<T_Mutex> qnode;
-        getAttnLock(qnode);
+        get_attn_lock(qnode);
         
         EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
         ed.issueDelete(callBack);
         
-        releaseLock(qnode);
+        release_lock(qnode);
     }
 
-    void registerFd(BaseFdWatcher *callback, int fd, int eventmask, bool enabled)
+    void registerFd(BaseFdWatcher *callback, int fd, int eventmask, bool enabled, bool emulate = false)
     {
         loop_mech.prepare_watcher(callback);
         try {
-            loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled);
+            if (! loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled, emulate)) {
+                callback->emulatefd = true;
+                callback->emulate_enabled = enabled;
+                if (enabled) {
+                    callback->event_flags = eventmask & IO_EVENTS;
+                    if (eventmask & IO_EVENTS) {
+                        requeue_watcher(callback);
+                    }
+                }
+            }
         }
         catch (...) {
             loop_mech.release_watcher(callback);
@@ -754,17 +783,38 @@ class event_loop
         }
     }
     
-    void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask)
+    void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask, bool emulate = false)
     {
         loop_mech.prepare_watcher(callback);
         try {
             loop_mech.prepare_watcher(&callback->outWatcher);
             try {
                 if (LoopTraits::has_separate_rw_fd_watches) {
-                    loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT);
+                    int r = loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT, emulate);
+                    if (r & IN_EVENTS) {
+                        callback->emulatefd = true;
+                        if (eventmask & IN_EVENTS) {
+                            requeue_watcher(callback);
+                        }
+                    }
+                    if (r & OUT_EVENTS) {
+                        callback->outWatcher.emulatefd = true;
+                        if (eventmask & OUT_EVENTS) {
+                            requeue_watcher(&callback->outWatcher);
+                        }
+                    }
                 }
                 else {
-                    loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT);
+                    if (! loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, true, emulate)) {
+                        callback->emulatefd = true;
+                        callback->outWatcher.emulatefd = true;
+                        if (eventmask & IN_EVENTS) {
+                            requeue_watcher(callback);
+                        }
+                        if (eventmask & OUT_EVENTS) {
+                            requeue_watcher(&callback->outWatcher);
+                        }
+                    }
                 }
             }
             catch (...) {
@@ -800,15 +850,21 @@ class event_loop
     
     void deregister(BaseFdWatcher *callback, int fd) noexcept
     {
-        loop_mech.removeFdWatch(fd, callback->watch_flags);
+        if (callback->emulatefd) {
+            auto & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+            ed.issueDelete(callback);
+            return;
+        }
         
+        loop_mech.removeFdWatch(fd, callback->watch_flags);
+
         waitqueue_node<T_Mutex> qnode;
-        getAttnLock(qnode);
+        get_attn_lock(qnode);
         
-        EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+        auto & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
         ed.issueDelete(callback);
         
-        releaseLock(qnode);        
+        release_lock(qnode);        
     }
     
     void deregister(BaseBidiFdWatcher *callback, int fd) noexcept
@@ -821,12 +877,12 @@ class event_loop
         }
         
         waitqueue_node<T_Mutex> qnode;
-        getAttnLock(qnode);
+        get_attn_lock(qnode);
         
         EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
         ed.issueDelete(callback);
         
-        releaseLock(qnode);
+        release_lock(qnode);
     }
     
     void reserveChildWatch(BaseChildWatcher *callback)
@@ -874,12 +930,12 @@ class event_loop
         loop_mech.removeChildWatch(child);
 
         waitqueue_node<T_Mutex> qnode;
-        getAttnLock(qnode);
+        get_attn_lock(qnode);
         
         EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
         ed.issueDelete(callback);
         
-        releaseLock(qnode);
+        release_lock(qnode);
     }
     
     void registerTimer(BaseTimerWatcher *callback, clock_type clock)
@@ -925,20 +981,20 @@ class event_loop
         loop_mech.removeTimer(callback->timer_handle, clock);
         
         waitqueue_node<T_Mutex> qnode;
-        getAttnLock(qnode);
+        get_attn_lock(qnode);
         
         EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
         ed.issueDelete(callback);
         
-        releaseLock(qnode);
+        release_lock(qnode);
     }
     
-    void dequeueWatcher(BaseWatcher *watcher) noexcept
+    void dequeue_watcher(BaseWatcher *watcher) noexcept
     {
         loop_mech.dequeueWatcher(watcher);
     }
 
-    void requeueWatcher(BaseWatcher *watcher) noexcept
+    void requeue_watcher(BaseWatcher *watcher) noexcept
     {
         loop_mech.queueWatcher(watcher);
     }
@@ -946,7 +1002,7 @@ class event_loop
     // Acquire the attention lock (when held, ensures that no thread is polling the AEN
     // mechanism). This can be used to safely remove watches, since it is certain that
     // notification callbacks won't be run while the attention lock is held.
-    void getAttnLock(waitqueue_node<T_Mutex> &qnode) noexcept
+    void get_attn_lock(waitqueue_node<T_Mutex> &qnode) noexcept
     {
         std::unique_lock<T_Mutex> ulock(wait_lock);
         attn_waitqueue.queue(&qnode);        
@@ -962,7 +1018,7 @@ class event_loop
     // the attention lock). The poll-wait lock is used to prevent more than a single thread from
     // polling the event loop mechanism at a time; if this is not done, it is basically
     // impossible to safely deregister watches.
-    void getPollwaitLock(waitqueue_node<T_Mutex> &qnode) noexcept
+    void get_pollwait_lock(waitqueue_node<T_Mutex> &qnode) noexcept
     {
         std::unique_lock<T_Mutex> ulock(wait_lock);
         if (attn_waitqueue.isEmpty()) {
@@ -979,7 +1035,7 @@ class event_loop
     }
     
     // Release the poll-wait/attention lock.
-    void releaseLock(waitqueue_node<T_Mutex> &qnode) noexcept
+    void release_lock(waitqueue_node<T_Mutex> &qnode) noexcept
     {
         std::unique_lock<T_Mutex> ulock(wait_lock);
         waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
@@ -1011,29 +1067,37 @@ class event_loop
     // Process rearm return for fd_watcher, including the primary watcher of a bidi_fd_watcher
     rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch) noexcept
     {
+        bool emulatedfd = static_cast<BaseWatcher *>(bfw)->emulatefd;
+
         // Called with lock held
         if (is_multi_watch) {
             BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
-            
+
             if (rearmType == rearm::REMOVE) {
                 bdfw->read_removed = 1;
                 
                 if (LoopTraits::has_separate_rw_fd_watches) {
                     bdfw->watch_flags &= ~IN_EVENTS;
-                    loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
+                    if (! emulatedfd) {
+                        loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
+                    }
                     return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP;
                 }
                 else {
                     if (! bdfw->write_removed) {
                         if (bdfw->watch_flags & IN_EVENTS) {
                             bdfw->watch_flags &= ~IN_EVENTS;
-                            loop_mech.enableFdWatch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags);
+                            if (! emulatedfd) {
+                                loop_mech.enableFdWatch_nolock(bdfw->watch_fd, bdfw, bdfw->watch_flags);
+                            }
                         }
                         return rearm::NOOP;
                     }
                     else {
                         // both removed: actually remove
-                        loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
+                        if (! emulatedfd) {
+                            loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
+                        }
                         return rearm::REMOVE;
                     }
                 }
@@ -1041,37 +1105,65 @@ class event_loop
             else if (rearmType == rearm::DISARM) {
                 bdfw->watch_flags &= ~IN_EVENTS;
 
-                if (! LoopTraits::has_separate_rw_fd_watches) {
-                    int watch_flags = bdfw->watch_flags;
-                    // without separate r/w watches, enableFdWatch actually sets
-                    // which sides are enabled (i.e. can be used to disable):
-                    loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
-                            static_cast<BaseWatcher *>(bdfw),
-                            (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
-                }
-                else {
-                    loop_mech.disableFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
+                if (! emulatedfd) {
+                    if (! LoopTraits::has_separate_rw_fd_watches) {
+                        int watch_flags = bdfw->watch_flags;
+                        // without separate r/w watches, enableFdWatch actually sets
+                        // which sides are enabled (i.e. can be used to disable):
+                        loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+                                static_cast<BaseWatcher *>(bdfw),
+                                (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+                    }
+                    else {
+                        loop_mech.disableFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
+                    }
                 }
             }
             else if (rearmType == rearm::REARM) {
                 bdfw->watch_flags |= IN_EVENTS;
                 
-                if (! LoopTraits::has_separate_rw_fd_watches) {
-                    int watch_flags = bdfw->watch_flags;
-                    loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
-                            static_cast<BaseWatcher *>(bdfw),
-                            (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+                if (! emulatedfd) {
+                    if (! LoopTraits::has_separate_rw_fd_watches) {
+                        int watch_flags = bdfw->watch_flags;
+                        loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+                                static_cast<BaseWatcher *>(bdfw),
+                                (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+                    }
+                    else {
+                        loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+                                static_cast<BaseWatcher *>(bdfw),
+                                IN_EVENTS | ONE_SHOT);
+                    }
                 }
                 else {
-                    loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
-                            static_cast<BaseWatcher *>(bdfw),
-                            IN_EVENTS | ONE_SHOT);
+                    rearmType = rearm::REQUEUE;
+                }
+            }
+            else if (rearmType == rearm::NOOP) {
+                if (bdfw->emulatefd) {
+                    if (bdfw->watch_flags & IN_EVENTS) {
+                        rearmType = rearm::REQUEUE;
+                    }
                 }
             }
             return rearmType;
         }
         else { // Not multi-watch:
-            if (rearmType == rearm::REARM) {
+            if (emulatedfd) {
+                if (rearmType == rearm::REARM) {
+                    bfw->emulate_enabled = true;
+                    rearmType = rearm::REQUEUE;
+                }
+                else if (rearmType == rearm::DISARM) {
+                    bfw->emulate_enabled = false;
+                }
+                else if (rearmType == rearm::NOOP) {
+                    if (bfw->emulate_enabled) {
+                        rearmType = rearm::REQUEUE;
+                    }
+                }
+            }
+            else  if (rearmType == rearm::REARM) {
                 loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw,
                         (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
             }
@@ -1086,12 +1178,34 @@ class event_loop
     }
 
     // Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher.
-    rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, rearm rearmType) noexcept
+    rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, BaseWatcher * outw, rearm rearmType) noexcept
     {
+        bool emulatedfd = outw->emulatefd;
+
         // Called with lock held
-        if (rearmType == rearm::REMOVE) {
+        if (emulatedfd) {
+            if (rearmType == rearm::REMOVE) {
+                bdfw->write_removed = 1;
+                bdfw->watch_flags &= ~OUT_EVENTS;
+                rearmType = bdfw->read_removed ? rearm::REMOVE : rearm::NOOP;
+            }
+            else if (rearmType == rearm::DISARM) {
+                bdfw->watch_flags &= ~OUT_EVENTS;
+            }
+            else if (rearmType == rearm::REARM) {
+                bdfw->watch_flags |= OUT_EVENTS;
+                rearmType = rearm::REQUEUE;
+            }
+            else if (rearmType == rearm::NOOP) {
+                if (bdfw->watch_flags & OUT_EVENTS) {
+                    rearmType = rearm::REQUEUE;
+                }
+            }
+            return rearmType;
+        }
+        else if (rearmType == rearm::REMOVE) {
             bdfw->write_removed = 1;
-            
+
             if (LoopTraits::has_separate_rw_fd_watches) {
                 bdfw->watch_flags &= ~OUT_EVENTS;
                 loop_mech.removeFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS);
@@ -1178,10 +1292,13 @@ class event_loop
             
             // (Above variables are initialised only to silence compiler warnings).
             
-            if (pqueue->watchType == WatchType::SECONDARYFD) {
+            if (pqueue->watchType == watch_type_t::SECONDARYFD) {
                 // construct a pointer to the main watcher:
                 char * rp = (char *)pqueue;
+                _Pragma ("GCC diagnostic push")
+                _Pragma ("GCC diagnostic ignored \"-Winvalid-offsetof\"")
                 rp -= offsetof(BaseBidiFdWatcher, outWatcher);
+                _Pragma ("GCC diagnostic pop")
                 bbfw = (BaseBidiFdWatcher *)rp;
 
                 // issue a secondary dispatch:
@@ -1198,7 +1315,6 @@ class event_loop
         return active;
     }
 
-    
     public:
     using mutex_t = T_Mutex;
     
@@ -1214,32 +1330,45 @@ class event_loop
     template <typename D> using child_proc_watcher_impl = dprivate::child_proc_watcher_impl<my_event_loop_t, D>;
     template <typename D> using timer_impl = dprivate::timer_impl<my_event_loop_t, D>;
 
+    // Poll the event loop and process any pending events. If no events are pending, wait
+    // for and process at least one event.
     void run() noexcept
     {
         // Poll the mechanism first, in case high-priority events are pending:
         waitqueue_node<T_Mutex> qnode;
-        getPollwaitLock(qnode);
+        get_pollwait_lock(qnode);
         loop_mech.pullEvents(false);
-        releaseLock(qnode);
+        release_lock(qnode);
 
         while (! processEvents()) {
             // Pull events from the AEN mechanism and insert them in our internal queue:
-            getPollwaitLock(qnode);
+            get_pollwait_lock(qnode);
             loop_mech.pullEvents(true);
-            releaseLock(qnode);
+            release_lock(qnode);
         }
     }
 
+    // Poll the event loop and process any pending events
     void poll() noexcept
     {
-        // Poll the mechanism first, in case high-priority events are pending:
         waitqueue_node<T_Mutex> qnode;
-        getPollwaitLock(qnode);
+        get_pollwait_lock(qnode);
         loop_mech.pullEvents(false);
-        releaseLock(qnode);
+        release_lock(qnode);
 
         processEvents();
     }
+
+    // Get the current time corresponding to a specific clock.
+    //   ts - the timespec variable to receive the time
+    //   clock - specifies the clock
+    //   force_update (default = false) - if true, the time returned will be updated from
+    //       the system rather than being a previously cached result. It may be more
+    //       accurate, but note that reading from a system clock may be relatively expensive.
+    void get_time(timespec &ts, clock_type clock, bool force_update = false) noexcept
+    {
+        loop_mech.get_time(ts, clock, force_update);
+    }
 };
 
 typedef event_loop<null_mutex> event_loop_n;
@@ -1249,15 +1378,15 @@ namespace dprivate {
 
 // Posix signal event watcher
 template <typename EventLoop>
-class signal_watcher : private dprivate::BaseSignalWatcher<typename EventLoop::mutex_t, typename EventLoop::loop_traits_t>
+class signal_watcher : private dprivate::base_signal_watcher<typename EventLoop::mutex_t, typename EventLoop::loop_traits_t>
 {
     template <typename, typename> friend class signal_watcher_impl;
 
-    using BaseWatcher = dprivate::BaseWatcher;
+    using BaseWatcher = dprivate::base_watcher;
     using T_Mutex = typename EventLoop::mutex_t;
     
     public:
-    using siginfo_p = typename dprivate::BaseSignalWatcher<T_Mutex, typename EventLoop::loop_traits_t>::siginfo_p;
+    using siginfo_p = typename dprivate::base_signal_watcher<T_Mutex, typename EventLoop::loop_traits_t>::siginfo_p;
 
     // Register this watcher to watch the specified signal.
     // If an attempt is made to register with more than one event loop at
@@ -1338,11 +1467,11 @@ class signal_watcher_impl : public signal_watcher<EventLoop>
 
 // Posix file descriptor event watcher
 template <typename EventLoop>
-class fd_watcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
+class fd_watcher : private dprivate::base_fd_watcher<typename EventLoop::mutex_t>
 {
     template <typename, typename> friend class fd_watcher_impl;
 
-    using BaseWatcher = dprivate::BaseWatcher;
+    using BaseWatcher = dprivate::base_watcher;
     using T_Mutex = typename EventLoop::mutex_t;
 
     protected:
@@ -1377,7 +1506,16 @@ class fd_watcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
         this->priority = prio;
         this->watch_fd = fd;
         this->watch_flags = flags;
-        eloop.registerFd(this, fd, flags, enabled);
+        eloop.registerFd(this, fd, flags, enabled, true);
+    }
+
+    void add_watch_noemu(EventLoop &eloop, int fd, int flags, bool enabled = true, int prio = DEFAULT_PRIORITY)
+    {
+        BaseWatcher::init();
+        this->priority = prio;
+        this->watch_fd = fd;
+        this->watch_flags = flags;
+        eloop.registerFd(this, fd, flags, enabled, false);
     }
     
     int get_watched_fd()
@@ -1401,9 +1539,14 @@ class fd_watcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
     void set_enabled(EventLoop &eloop, bool enable) noexcept
     {
         std::lock_guard<T_Mutex> guard(eloop.getBaseLock());
-        eloop.setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable);
+        if (this->emulatefd) {
+            this->emulate_enabled = enable;
+        }
+        else {
+            eloop.setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable);
+        }
         if (! enable) {
-            eloop.dequeueWatcher(this);
+            eloop.dequeue_watcher(this);
         }
     }
     
@@ -1412,13 +1555,13 @@ class fd_watcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
     template <typename T>
     static fd_watcher<EventLoop> *add_watch(EventLoop &eloop, int fd, int flags, T watchHndlr)
     {
-        class LambdaFdWatcher : public fd_watcher_impl<EventLoop, LambdaFdWatcher>
+        class lambda_fd_watcher : public fd_watcher_impl<EventLoop, lambda_fd_watcher>
         {
             private:
             T watchHndlr;
 
             public:
-            LambdaFdWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a)
+            lambda_fd_watcher(T watchHandlr_a) : watchHndlr(watchHandlr_a)
             {
                 //
             }
@@ -1434,7 +1577,7 @@ class fd_watcher : private dprivate::BaseFdWatcher<typename EventLoop::mutex_t>
             }
         };
         
-        LambdaFdWatcher * lfd = new LambdaFdWatcher(watchHndlr);
+        lambda_fd_watcher * lfd = new lambda_fd_watcher(watchHndlr);
         lfd->add_watch(eloop, fd, flags);
         return lfd;
     }
@@ -1448,6 +1591,10 @@ class fd_watcher_impl : public fd_watcher<EventLoop>
     void dispatch(void *loop_ptr) noexcept override
     {
         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
+
+        // In case emulating, clear enabled here; REARM or explicit set_enabled will re-enable.
+        this->emulate_enabled = false;
+
         loop.getBaseLock().unlock();
 
         auto rearmType = static_cast<Derived *>(this)->fd_event(loop, this->watch_fd, this->event_flags);
@@ -1474,11 +1621,11 @@ class fd_watcher_impl : public fd_watcher<EventLoop>
 // This watcher type has two event notification methods which can both potentially be
 // active at the same time.
 template <typename EventLoop>
-class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::mutex_t>
+class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher<typename EventLoop::mutex_t>
 {
     template <typename, typename> friend class bidi_fd_watcher_impl;
 
-    using BaseWatcher = dprivate::BaseWatcher;
+    using BaseWatcher = dprivate::base_watcher;
     using T_Mutex = typename EventLoop::mutex_t;
     
     void set_watch_enabled(EventLoop &eloop, bool in, bool b)
@@ -1491,22 +1638,23 @@ class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::
         else {
             this->watch_flags &= ~events;
         }
-        if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) {
-            dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
-            eloop.setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b);
-            if (! b) {
-                eloop.dequeueWatcher(watcher);
+
+        dprivate::base_watcher * watcher = in ? this : &this->outWatcher;
+
+        if (! basewatcher_get_emulatefd(*watcher)) {
+            if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) {
+                eloop.setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b);
             }
-        }
-        else {
-            eloop.setFdEnabled_nolock(this, this->watch_fd,
-                    (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT,
-                    (this->watch_flags & (IN_EVENTS | OUT_EVENTS)) != 0);
-            if (! b) {
-                dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
-                eloop.dequeueWatcher(watcher);
+            else {
+                eloop.setFdEnabled_nolock(this, this->watch_fd,
+                        (this->watch_flags & IO_EVENTS) | ONE_SHOT,
+                        (this->watch_flags & IO_EVENTS) != 0);
             }
         }
+
+        if (! b) {
+            eloop.dequeue_watcher(watcher);
+        }
     }
     
     public:
@@ -1536,13 +1684,14 @@ class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::
     void set_watches(EventLoop &eloop, int newFlags)
     {
         std::lock_guard<T_Mutex> guard(eloop.getBaseLock());
-        if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) {
+        bool use_emulation = this->emulatefd || basewatcher_get_emulatefd(this->outWatcher);
+        if (use_emulation || EventLoop::loop_traits_t::has_separate_rw_fd_watches) {
             set_watch_enabled(eloop, true, (newFlags & IN_EVENTS) != 0);
             set_watch_enabled(eloop, false, (newFlags & OUT_EVENTS) != 0);
         }
         else {
             this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags;
-            eloop.setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
+            eloop.setFdEnabled((dprivate::base_watcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
         }
     }
     
@@ -1560,9 +1709,22 @@ class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::
         this->write_removed = false;
         this->priority = inprio;
         this->set_priority(this->outWatcher, outprio);
-        eloop.registerFd(this, fd, flags);
+        eloop.registerFd(this, fd, flags, true);
     }
-    
+
+    void add_watch_noemu(EventLoop &eloop, int fd, int flags, int inprio = DEFAULT_PRIORITY, int outprio = DEFAULT_PRIORITY)
+    {
+        BaseWatcher::init();
+        this->outWatcher.BaseWatcher::init();
+        this->watch_fd = fd;
+        this->watch_flags = flags | dprivate::multi_watch;
+        this->read_removed = false;
+        this->write_removed = false;
+        this->priority = inprio;
+        this->set_priority(this->outWatcher, outprio);
+        eloop.registerFd(this, fd, flags, false);
+    }
+
     int get_watched_fd()
     {
         return this->watch_fd;
@@ -1626,6 +1788,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
     void dispatch(void *loop_ptr) noexcept override
     {
         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
+        this->emulate_enabled = false;
         loop.getBaseLock().unlock();
 
         auto rearmType = static_cast<Derived *>(this)->read_ready(loop, this->watch_fd);
@@ -1665,7 +1828,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
                 rearmType = rearm::REMOVE;
             }
 
-            rearmType = loop.processSecondaryRearm(this, rearmType);
+            rearmType = loop.processSecondaryRearm(this, &outwatcher, rearmType);
 
             if (rearmType == rearm::REQUEUE) {
                 post_dispatch(loop, &outwatcher, rearmType);
@@ -1679,11 +1842,11 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
 
 // Child process event watcher
 template <typename EventLoop>
-class child_proc_watcher : private dprivate::BaseChildWatcher<typename EventLoop::mutex_t>
+class child_proc_watcher : private dprivate::base_child_watcher<typename EventLoop::mutex_t>
 {
     template <typename, typename> friend class child_proc_watcher_impl;
 
-    using BaseWatcher = dprivate::BaseWatcher;
+    using BaseWatcher = dprivate::base_watcher;
     using T_Mutex = typename EventLoop::mutex_t;
 
     public:
@@ -1738,8 +1901,11 @@ class child_proc_watcher : private dprivate::BaseChildWatcher<typename EventLoop
     // Returns:
     // - the child pid in the parent
     // - 0 in the child
-    pid_t fork(EventLoop &eloop, bool from_reserved = false)
+    pid_t fork(EventLoop &eloop, bool from_reserved = false, int prio = DEFAULT_PRIORITY)
     {
+        BaseWatcher::init();
+        this->priority = prio;
+
         if (EventLoop::loop_traits_t::supports_childwatch_reservation) {
             // Reserve a watch, fork, then claim reservation
             if (! from_reserved) {
@@ -1764,6 +1930,7 @@ class child_proc_watcher : private dprivate::BaseChildWatcher<typename EventLoop
             }
             
             // Register this watcher.
+            this->watch_pid = child;
             eloop.registerReservedChild_nolock(this, child);
             lock.unlock();
             return child;
@@ -1801,6 +1968,7 @@ class child_proc_watcher : private dprivate::BaseChildWatcher<typename EventLoop
             
             // Register this watcher.
             try {
+                this->watch_pid = child;
                 eloop.registerChild(this, child);
                 
                 // Continue in child (it doesn't matter what is written):
@@ -1827,7 +1995,7 @@ class child_proc_watcher_impl : public child_proc_watcher<EventLoop>
         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
         loop.getBaseLock().unlock();
 
-        auto rearmType = static_cast<Derived *>(this)->child_status(loop, this->watch_pid, this->child_status);
+        auto rearmType = static_cast<Derived *>(this)->status_change(loop, this->watch_pid, this->child_status);
 
         loop.getBaseLock().lock();
 
@@ -1846,15 +2014,16 @@ class child_proc_watcher_impl : public child_proc_watcher<EventLoop>
 };
 
 template <typename EventLoop>
-class timer : private BaseTimerWatcher<typename EventLoop::mutex_t>
+class timer : private base_timer_watcher<typename EventLoop::mutex_t>
 {
     template <typename, typename> friend class timer_impl;
-    using base_t = BaseTimerWatcher<typename EventLoop::mutex_t>;
+    using base_t = base_timer_watcher<typename EventLoop::mutex_t>;
 
     public:
     
     void add_timer(EventLoop &eloop, clock_type clock = clock_type::MONOTONIC, int prio = DEFAULT_PRIORITY)
     {
+        base_watcher::init();
         this->priority = prio;
         this->clock = clock;
         eloop.registerTimer(this, clock);