Incorporate Dasynq library changes.
authorDavin McCall <davmac@davmac.org>
Wed, 26 Apr 2017 09:33:00 +0000 (10:33 +0100)
committerDavin McCall <davmac@davmac.org>
Wed, 26 Apr 2017 09:33:00 +0000 (10:33 +0100)
src/dasynq/dasynq-epoll.h
src/dasynq/dasynq-itimer.h
src/dasynq/dasynq-svec.h
src/dasynq/dasynq-timerfd.h
src/dasynq/dasynq.h

index 09775d21d3b751c6ecadd077b8209a0b7efb8897..c66e85e4ed8be1b80aec4941ed9e7d28a4fa03cd 100644 (file)
@@ -265,6 +265,7 @@ template <class Base> class EpollLoop : public Base
             // as we see them.
             if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) {
                 close(sigfd);
+                sigfd = -1;
                 throw new std::system_error(errno, std::system_category());        
             }
         }
index f6b48ee975156c7a9f3dc690eb63115525acb494..4f1497843076b749a6128952782b5389dcb57c9a 100644 (file)
@@ -17,15 +17,11 @@ template <class Base> class ITimerEvents : public timer_base<Base>
 
     timer_queue_t timer_queue;
     
-#if defined(__APPLE__)
-#define itimerspec itimerval
-#endif
-    
     // Set the timerfd timeout to match the first timer in the queue (disable the timerfd
     // if there are no active timers).
     void set_timer_from_queue()
     {
-        struct itimerspec newtime;
+        struct timespec newtime;
         struct itimerval newalarm;
         if (timer_queue.empty()) {
             newalarm.it_value = {0, 0};
@@ -34,14 +30,7 @@ template <class Base> class ITimerEvents : public timer_base<Base>
             return;
         }
 
-#if defined(__APPLE__)
-        auto &rp = timer_queue.get_root_priority();
-        newtime.it_value.tv_sec = rp.tv_sec;
-        newtime.it_value.tv_usec = rp.tv_nsec / 1000;
-#else
-        newtime.it_value = timer_queue.get_root_priority();
-        newtime.it_interval = {0, 0};
-#endif
+        newtime = timer_queue.get_root_priority();
         
         struct timespec curtime;
 #if defined(__APPLE__)
@@ -53,22 +42,15 @@ template <class Base> class ITimerEvents : public timer_base<Base>
         clock_gettime(CLOCK_MONOTONIC, &curtime);
 #endif
         newalarm.it_interval = {0, 0};
-        newalarm.it_value.tv_sec = newtime.it_value.tv_sec - curtime.tv_sec;
-#if defined(__APPLE__)
-        newalarm.it_value.tv_usec = newtime.it_value.tv_usec - curtime.tv_nsec / 1000;
-#else
-        newalarm.it_value.tv_usec = (newtime.it_value.tv_nsec - curtime.tv_nsec) / 1000;
-#endif
+        newalarm.it_value.tv_sec = newtime.tv_sec - curtime.tv_sec;
+        newalarm.it_value.tv_usec = (newtime.tv_nsec - curtime.tv_nsec) / 1000;
+
         if (newalarm.it_value.tv_usec < 0) {
             newalarm.it_value.tv_usec += 1000000;
             newalarm.it_value.tv_sec--;
         }
         setitimer(ITIMER_REAL, &newalarm, nullptr);
     }
-
-#if defined(__APPLE__)
-#undef itimerspec
-#endif
     
     protected:
     
@@ -115,11 +97,13 @@ template <class Base> class ITimerEvents : public timer_base<Base>
 
     void addTimer(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC)
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         timer_queue.allocate(h, userdata);
     }
     
     void removeTimer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         removeTimer_nolock(timer_id, clock);
     }
     
@@ -136,12 +120,13 @@ template <class Base> class ITimerEvents : public timer_base<Base>
     void setTimer(timer_handle_t &timer_id, struct timespec &timeout, struct timespec &interval,
             bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
         auto &ts = timer_queue.node_data(timer_id);
         ts.interval_time = interval;
         ts.expiry_count = 0;
+        ts.enabled = enable;
 
-        // TODO also update interval / enabled
-        
         if (timer_queue.is_queued(timer_id)) {
             // Already queued; alter timeout
             if (timer_queue.set_priority(timer_id, timeout)) {
@@ -153,8 +138,6 @@ template <class Base> class ITimerEvents : public timer_base<Base>
                 set_timer_from_queue();
             }
         }
-        
-        // TODO locking (here and everywhere)
     }
 
     // Set timer relative to current time:    
@@ -183,6 +166,7 @@ template <class Base> class ITimerEvents : public timer_base<Base>
     // Enables or disabling report of timeouts (does not stop timer)
     void enableTimer(timer_handle_t &timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         enableTimer_nolock(timer_id, enable, clock);
     }
     
@@ -201,6 +185,7 @@ template <class Base> class ITimerEvents : public timer_base<Base>
 
     void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         stop_timer_nolock(timer_id, clock);
     }
 
index 9ff09d053ae4c1a9acd4a3fe21f19f74c8d9bbb0..32131d2a777cea8fc82b24c8f57bc8e3f120e69b 100644 (file)
@@ -157,15 +157,25 @@ class svector
         return array[size_v - 1];
     }
 
-    T* begin() const
+    T* begin()
     {
         return array;
     }
+    
+    const T *begin() const
+    {
+       return array;
+    }
 
-    T* end() const
+    T* end()
     {
         return array + size_v;
     }
+    
+    const T *end() const
+    {
+       return array + size_v;
+    }
 };
 
 
index a73e92e8f8d1858426a1b5be4da002547e57baa6..e660efe71ed02aa3ebd307ef49057e7ffeaa1c03 100644 (file)
@@ -65,6 +65,8 @@ template <class Base> class TimerFdEvents : public timer_base<Base>
     void setTimer(timer_handle_t & timer_id, struct timespec &timeout, struct timespec &interval,
             timer_queue_t &queue, int fd, bool enable) noexcept
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+
         auto &ts = queue.node_data(timer_id);
         ts.interval_time = interval;
         ts.expiry_count = 0;
@@ -81,8 +83,6 @@ template <class Base> class TimerFdEvents : public timer_base<Base>
                 set_timer_from_queue(fd, queue);
             }
         }
-
-        // TODO locking (here and everywhere)
     }
 
     timer_queue_t & get_queue(clock_type clock)
@@ -139,12 +139,14 @@ template <class Base> class TimerFdEvents : public timer_base<Base>
     // Add timer, store into given handle
     void addTimer(timer_handle_t &h, void *userdata, clock_type clock = clock_type::MONOTONIC)
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         timer_queue_t & queue = get_queue(clock);
         queue.allocate(h, userdata);
     }
     
     void removeTimer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         removeTimer_nolock(timer_id, clock);
     }
     
@@ -159,6 +161,7 @@ template <class Base> class TimerFdEvents : public timer_base<Base>
 
     void stop_timer(timer_handle_t &timer_id, clock_type clock = clock_type::MONOTONIC) noexcept
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         stop_timer_nolock(timer_id, clock);
     }
 
@@ -219,6 +222,7 @@ template <class Base> class TimerFdEvents : public timer_base<Base>
     // Enables or disabling report of timeouts (does not stop timer)
     void enableTimer(timer_handle_t & timer_id, bool enable, clock_type clock = clock_type::MONOTONIC) noexcept
     {
+        std::lock_guard<decltype(Base::lock)> guard(Base::lock);
         enableTimer_nolock(timer_id, enable, clock);
     }
     
index 18642738a4f7f9a55e8f19d8f90d1b207206fdbd..715841a9197ee86ee15c764d7e1596160b031682 100644 (file)
@@ -70,20 +70,20 @@ namespace dasynq {
 namespace dasynq {
 
 #ifdef __APPLE__
-int pipe2(int filedes[2], int flags)
+inline int pipe2(int filedes[2], int flags)
 {
     if (pipe(filedes) == -1) {
         return -1;
     }
 
     if (flags & O_CLOEXEC) {
-       fcntl(filedes[0], F_SETFD, FD_CLOEXEC);
-       fcntl(filedes[1], F_SETFD, FD_CLOEXEC);
+        fcntl(filedes[0], F_SETFD, FD_CLOEXEC);
+        fcntl(filedes[1], F_SETFD, FD_CLOEXEC);
     }
 
     if (flags & O_NONBLOCK) {
-       fcntl(filedes[0], F_SETFL, O_NONBLOCK);
-       fcntl(filedes[1], F_SETFL, O_NONBLOCK);
+        fcntl(filedes[0], F_SETFL, O_NONBLOCK);
+        fcntl(filedes[1], F_SETFL, O_NONBLOCK);
     }
 
     return 0;
@@ -114,8 +114,9 @@ enum class rearm
     /** Remove the event watcher (and call "removed" callback) */
     REMOVE,
     /** The watcher has been removed - don't touch it! */
-    REMOVED
-// TODO: add a REQUEUE option, which means, "I didn't complete input/output, run me again soon"
+    REMOVED,
+    /** RE-queue the watcher to have its notification called again */
+    REQUEUE
 };
 
 // Forward declarations:
@@ -158,6 +159,8 @@ namespace dprivate {
     {
         template <typename T_Mutex, typename Traits> friend class EventDispatch;
         template <typename T_Mutex, template <typename> class, typename> friend class dasynq::event_loop;
+        friend inline void basewatcher_set_active(BaseWatcher &watcher, bool active);
+        friend inline bool basewatcher_get_deleteme(const BaseWatcher &watcher);
         
         protected:
         WatchType watchType;
@@ -201,6 +204,16 @@ namespace dprivate {
         }
     };
     
+    inline void basewatcher_set_active(BaseWatcher &watcher, bool active)
+    {
+        watcher.active = active;
+    }
+
+    inline bool basewatcher_get_deleteme(const BaseWatcher &watcher)
+    {
+        return watcher.deleteme;
+    }
+
     // Base signal event - not part of public API
     template <typename T_Mutex, typename Traits>
     class BaseSignalWatcher : public BaseWatcher
@@ -409,10 +422,24 @@ namespace dprivate {
         }
     };
     
+    // Do standard post-dispatch processing for a watcher. This handles the case of removing or
+    // re-queing watchers depending on the rearm type.
+    template <typename Loop> void post_dispatch(Loop &loop, BaseWatcher *watcher, rearm rearmType)
+    {
+        if (rearmType == rearm::REMOVE) {
+            loop.getBaseLock().unlock();
+            watcher->watch_removed();
+            loop.getBaseLock().lock();
+        }
+        else if (rearmType == rearm::REQUEUE) {
+            loop.requeueWatcher(watcher);
+        }
+    }
+
     // This class serves as the base class (mixin) for the AEN mechanism class.
     //
     // The EventDispatch class maintains the queued event data structures. It inserts watchers
-    // into the queue when eventes are received (receiveXXX methods).
+    // into the queue when events are received (receiveXXX methods).
     template <typename T_Mutex, typename Traits> class EventDispatch : public Traits
     {
         template <typename, template <typename> class, typename> friend class dasynq::event_loop;
@@ -438,20 +465,20 @@ namespace dprivate {
             event_queue.insert(bwatcher->heap_handle, bwatcher->priority);
         }
         
-        bool isQueued(BaseWatcher *bwatcher)
+        bool isQueued(BaseWatcher *bwatcher) noexcept
         {
             return event_queue.is_queued(bwatcher->heap_handle);
         }
 
-        void dequeueWatcher(BaseWatcher *bwatcher)
+        void dequeueWatcher(BaseWatcher *bwatcher) noexcept
         {
             if (event_queue.is_queued(bwatcher->heap_handle)) {
                 event_queue.remove(bwatcher->heap_handle);
             }
         }
-        
+
         // Remove watcher from the queueing system
-        void release_watcher(BaseWatcher *bwatcher)
+        void release_watcher(BaseWatcher *bwatcher) noexcept
         {
             event_queue.deallocate(bwatcher->heap_handle);
         }
@@ -459,11 +486,11 @@ namespace dprivate {
         protected:
         T_Mutex lock;
 
-        template <typename T> void init(T *loop) { }
+        template <typename T> void init(T *loop) noexcept { }
         
         // Receive a signal; return true to disable signal watch or false to leave enabled
         template <typename T>
-        bool receiveSignal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata)
+        bool receiveSignal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata) noexcept
         {
             BaseSignalWatcher * bwatcher = static_cast<BaseSignalWatcher *>(userdata);
             bwatcher->siginfo = siginfo;
@@ -472,7 +499,7 @@ namespace dprivate {
         }
         
         template <typename T>
-        void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags)
+        void receiveFdEvent(T &loop_mech, typename Traits::FD_r fd_r, void * userdata, int flags) noexcept
         {
             BaseFdWatcher * bfdw = static_cast<BaseFdWatcher *>(userdata);
             
@@ -509,22 +536,22 @@ namespace dprivate {
             }
         }
         
-        void receiveChildStat(pid_t child, int status, void * userdata)
+        void receiveChildStat(pid_t child, int status, void * userdata) noexcept
         {
             BaseChildWatcher * watcher = static_cast<BaseChildWatcher *>(userdata);
             watcher->child_status = status;
             queueWatcher(watcher);
         }
         
-        void receiveTimerExpiry(timer_handle_t & timer_handle, void * userdata, int intervals)
+        void receiveTimerExpiry(timer_handle_t & timer_handle, void * userdata, int intervals) noexcept
         {
             BaseTimerWatcher * watcher = static_cast<BaseTimerWatcher *>(userdata);
             watcher->intervals = intervals;
             queueWatcher(watcher);
         }
         
-        // Pull a single event from the queue
-        BaseWatcher * pullEvent()
+        // Pull a single event from the queue; returns nullptr if the queue is empty.
+        BaseWatcher * pullEvent() noexcept
         {
             if (event_queue.empty()) {
                 return nullptr;
@@ -611,6 +638,9 @@ class event_loop
     friend class dprivate::child_proc_watcher<my_event_loop_t>;
     friend class dprivate::timer<my_event_loop_t>;
     
+    friend void dprivate::post_dispatch<my_event_loop_t>(my_event_loop_t &loop,
+            dprivate::BaseWatcher *watcher, rearm rearmType);
+
     template <typename, typename> friend class dprivate::fd_watcher_impl;
     template <typename, typename> friend class dprivate::bidi_fd_watcher_impl;
     template <typename, typename> friend class dprivate::signal_watcher_impl;
@@ -682,7 +712,7 @@ class event_loop
     waitqueue<T_Mutex> attn_waitqueue;
     waitqueue<T_Mutex> wait_waitqueue;
     
-    T_Mutex &getBaseLock()
+    T_Mutex &getBaseLock() noexcept
     {
         return loop_mech.lock;
     }
@@ -748,7 +778,7 @@ class event_loop
         }
     }
     
-    void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled)
+    void setFdEnabled(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) noexcept
     {
         if (enabled) {
             loop_mech.enableFdWatch(fd, watcher, watch_flags | ONE_SHOT);
@@ -758,7 +788,7 @@ class event_loop
         }
     }
 
-    void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled)
+    void setFdEnabled_nolock(BaseWatcher *watcher, int fd, int watch_flags, bool enabled) noexcept
     {
         if (enabled) {
             loop_mech.enableFdWatch_nolock(fd, watcher, watch_flags | ONE_SHOT);
@@ -768,7 +798,7 @@ class event_loop
         }
     }
     
-    void deregister(BaseFdWatcher *callback, int fd)
+    void deregister(BaseFdWatcher *callback, int fd) noexcept
     {
         loop_mech.removeFdWatch(fd, callback->watch_flags);
         
@@ -781,7 +811,7 @@ class event_loop
         releaseLock(qnode);        
     }
     
-    void deregister(BaseBidiFdWatcher *callback, int fd)
+    void deregister(BaseBidiFdWatcher *callback, int fd) noexcept
     {
         if (LoopTraits::has_separate_rw_fd_watches) {
             loop_mech.removeBidiFdWatch(fd);
@@ -811,7 +841,7 @@ class event_loop
         }
     }
     
-    void unreserve(BaseChildWatcher *callback)
+    void unreserve(BaseChildWatcher *callback) noexcept
     {
         loop_mech.unreserveChildWatch();
         loop_mech.release_watcher(callback);
@@ -839,7 +869,7 @@ class event_loop
         loop_mech.addReservedChildWatch_nolock(child, callBack);
     }
     
-    void deregister(BaseChildWatcher *callback, pid_t child)
+    void deregister(BaseChildWatcher *callback, pid_t child) noexcept
     {
         loop_mech.removeChildWatch(child);
 
@@ -863,34 +893,34 @@ class event_loop
         }
     }
     
-    void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, clock_type clock)
+    void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, clock_type clock) noexcept
     {
         struct timespec interval {0, 0};
         loop_mech.setTimer(callBack->timer_handle, timeout, interval, true, clock);
     }
     
-    void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval, clock_type clock)
+    void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval, clock_type clock) noexcept
     {
         loop_mech.setTimer(callBack->timer_handle, timeout, interval, true, clock);
     }
 
-    void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, clock_type clock)
+    void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, clock_type clock) noexcept
     {
         struct timespec interval {0, 0};
         loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true, clock);
     }
     
-    void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval, clock_type clock)
+    void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval, clock_type clock) noexcept
     {
         loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true, clock);
     }
 
-    void stop_timer(BaseTimerWatcher *callback, clock_type clock)
+    void stop_timer(BaseTimerWatcher *callback, clock_type clock) noexcept
     {
         loop_mech.stop_timer(callback->timer_handle, clock);
     }
 
-    void deregister(BaseTimerWatcher *callback, clock_type clock)
+    void deregister(BaseTimerWatcher *callback, clock_type clock) noexcept
     {
         loop_mech.removeTimer(callback->timer_handle, clock);
         
@@ -908,9 +938,15 @@ class event_loop
         loop_mech.dequeueWatcher(watcher);
     }
 
+    void requeueWatcher(BaseWatcher *watcher) noexcept
+    {
+        loop_mech.queueWatcher(watcher);
+    }
+
     // Acquire the attention lock (when held, ensures that no thread is polling the AEN
-    // mechanism).
-    void getAttnLock(waitqueue_node<T_Mutex> &qnode)
+    // mechanism). This can be used to safely remove watches, since it is certain that
+    // notification callbacks won't be run while the attention lock is held.
+    void getAttnLock(waitqueue_node<T_Mutex> &qnode) noexcept
     {
         std::unique_lock<T_Mutex> ulock(wait_lock);
         attn_waitqueue.queue(&qnode);        
@@ -922,9 +958,11 @@ class event_loop
         }
     }
     
-    // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower
-    // priority than the attention lock).
-    void getPollwaitLock(waitqueue_node<T_Mutex> &qnode)
+    // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower priority than
+    // the attention lock). The poll-wait lock is used to prevent more than a single thread from
+    // polling the event loop mechanism at a time; if this is not done, it is basically
+    // impossible to safely deregister watches.
+    void getPollwaitLock(waitqueue_node<T_Mutex> &qnode) noexcept
     {
         std::unique_lock<T_Mutex> ulock(wait_lock);
         if (attn_waitqueue.isEmpty()) {
@@ -941,7 +979,7 @@ class event_loop
     }
     
     // Release the poll-wait/attention lock.
-    void releaseLock(waitqueue_node<T_Mutex> &qnode)
+    void releaseLock(waitqueue_node<T_Mutex> &qnode) noexcept
     {
         std::unique_lock<T_Mutex> ulock(wait_lock);
         waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
@@ -958,7 +996,7 @@ class event_loop
         }                
     }
     
-    void processSignalRearm(BaseSignalWatcher * bsw, rearm rearmType)
+    void processSignalRearm(BaseSignalWatcher * bsw, rearm rearmType) noexcept
     {
         // Called with lock held
         if (rearmType == rearm::REARM) {
@@ -967,14 +1005,13 @@ class event_loop
         else if (rearmType == rearm::REMOVE) {
             loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo());
         }
+        // Note that signal watchers cannot (currently) be disarmed
     }
 
     // Process rearm return for fd_watcher, including the primary watcher of a bidi_fd_watcher
-    rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch)
+    rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch) noexcept
     {
-        // Called with lock held;
-        //   bdfw->event_flags contains only with pending (queued) events
-        
+        // Called with lock held
         if (is_multi_watch) {
             BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
             
@@ -1002,7 +1039,19 @@ class event_loop
                 }
             }
             else if (rearmType == rearm::DISARM) {
-                // TODO should actually disarm.
+                bdfw->watch_flags &= ~IN_EVENTS;
+
+                if (! LoopTraits::has_separate_rw_fd_watches) {
+                    int watch_flags = bdfw->watch_flags;
+                    // without separate r/w watches, enableFdWatch actually sets
+                    // which sides are enabled (i.e. can be used to disable):
+                    loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+                            static_cast<BaseWatcher *>(bdfw),
+                            (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+                }
+                else {
+                    loop_mech.disableFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
+                }
             }
             else if (rearmType == rearm::REARM) {
                 bdfw->watch_flags |= IN_EVENTS;
@@ -1026,6 +1075,9 @@ class event_loop
                 loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw,
                         (bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
             }
+            else if (rearmType == rearm::DISARM) {
+                loop_mech.disableFdWatch_nolock(bfw->watch_fd, bfw->watch_flags);
+            }
             else if (rearmType == rearm::REMOVE) {
                 loop_mech.removeFdWatch_nolock(bfw->watch_fd, bfw->watch_flags);
             }
@@ -1034,7 +1086,7 @@ class event_loop
     }
 
     // Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher.
-    rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, rearm rearmType)
+    rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, rearm rearmType) noexcept
     {
         // Called with lock held
         if (rearmType == rearm::REMOVE) {
@@ -1061,7 +1113,17 @@ class event_loop
             }
         }
         else if (rearmType == rearm::DISARM) {
-            // TODO actually disarm.
+            bdfw->watch_flags &= ~OUT_EVENTS;
+
+            if (! LoopTraits::has_separate_rw_fd_watches) {
+                int watch_flags = bdfw->watch_flags;
+                loop_mech.enableFdWatch_nolock(bdfw->watch_fd,
+                        static_cast<BaseWatcher *>(bdfw),
+                        (watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
+            }
+            else {
+                loop_mech.disableFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS);
+            }
         }
         else if (rearmType == rearm::REARM) {
             bdfw->watch_flags |= OUT_EVENTS;
@@ -1081,7 +1143,7 @@ class event_loop
         return rearmType;
     }
     
-    void processTimerRearm(BaseTimerWatcher *btw, rearm rearmType)
+    void processTimerRearm(BaseTimerWatcher *btw, rearm rearmType) noexcept
     {
         // Called with lock held
         if (rearmType == rearm::REARM) {
@@ -1090,9 +1152,12 @@ class event_loop
         else if (rearmType == rearm::REMOVE) {
             loop_mech.removeTimer_nolock(btw->timer_handle, btw->clock);
         }
-        // TODO DISARM?
+        else if (rearmType == rearm::DISARM) {
+            loop_mech.enableTimer_nolock(btw->timer_handle, false, btw->clock);
+        }
     }
 
+    // Process all queued events; returns true if any events were processed.
     bool processEvents() noexcept
     {
         EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
@@ -1151,27 +1216,34 @@ class event_loop
 
     void run() noexcept
     {
+        // Poll the mechanism first, in case high-priority events are pending:
+        waitqueue_node<T_Mutex> qnode;
+        getPollwaitLock(qnode);
+        loop_mech.pullEvents(false);
+        releaseLock(qnode);
+
         while (! processEvents()) {
-            waitqueue_node<T_Mutex> qnode;
-            
-            // We only allow one thread to poll the mechanism at any time, since otherwise
-            // removing event watchers is a nightmare beyond comprehension.
-            getPollwaitLock(qnode);
-            
             // Pull events from the AEN mechanism and insert them in our internal queue:
+            getPollwaitLock(qnode);
             loop_mech.pullEvents(true);
-            
-            // Now release the wait lock:
             releaseLock(qnode);
         }
     }
-};
 
-typedef event_loop<null_mutex> NEventLoop;
-typedef event_loop<std::mutex> TEventLoop;
+    void poll() noexcept
+    {
+        // Poll the mechanism first, in case high-priority events are pending:
+        waitqueue_node<T_Mutex> qnode;
+        getPollwaitLock(qnode);
+        loop_mech.pullEvents(false);
+        releaseLock(qnode);
+
+        processEvents();
+    }
+};
 
-// from dasync.cc:
-TEventLoop & getSystemLoop();
+typedef event_loop<null_mutex> event_loop_n;
+typedef event_loop<std::mutex> event_loop_th;
 
 namespace dprivate {
 
@@ -1259,11 +1331,7 @@ class signal_watcher_impl : public signal_watcher<EventLoop>
 
             loop.processSignalRearm(this, rearmType);
 
-            if (rearmType == rearm::REMOVE) {
-                loop.getBaseLock().unlock();
-                this->watch_removed();
-                loop.getBaseLock().lock();
-            }
+            post_dispatch(loop, this, rearmType);
         }
     }
 };
@@ -1396,11 +1464,7 @@ class fd_watcher_impl : public fd_watcher<EventLoop>
 
             rearmType = loop.processFdRearm(this, rearmType, false);
 
-            if (rearmType == rearm::REMOVE) {
-                loop.getBaseLock().unlock();
-                this->watch_removed();
-                loop.getBaseLock().lock();
-            }
+            post_dispatch(loop, this, rearmType);
         }
     }
 };
@@ -1447,14 +1511,6 @@ class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::
     
     public:
 
-    // This should never actually get called:
-    /*
-    rearm fdEvent(EventLoop &eloop, int fd, int flags) final
-    {
-        return rearm::REARM; // should not be reachable.
-    };
-    */
-
     void set_in_watch_enabled(EventLoop &eloop, bool b) noexcept
     {
         eloop.getBaseLock().lock();
@@ -1525,6 +1581,41 @@ class bidi_fd_watcher : private dprivate::BaseBidiFdWatcher<typename EventLoop::
         eloop.deregister(this, this->watch_fd);
     }
     
+    template <typename T>
+    static bidi_fd_watcher<EventLoop> *add_watch(EventLoop &eloop, int fd, int flags, T watchHndlr)
+    {
+        class LambdaBidiWatcher : public bidi_fd_watcher_impl<EventLoop, LambdaBidiWatcher>
+        {
+            private:
+            T watchHndlr;
+
+            public:
+            LambdaBidiWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a)
+            {
+                //
+            }
+
+            rearm read_ready(EventLoop &eloop, int fd)
+            {
+                return watchHndlr(eloop, fd, IN_EVENTS);
+            }
+
+            rearm write_ready(EventLoop &eloop, int fd)
+            {
+                return watchHndlr(eloop, fd, OUT_EVENTS);
+            }
+
+            void watch_removed() noexcept override
+            {
+                delete this;
+            }
+        };
+
+        LambdaBidiWatcher * lfd = new LambdaBidiWatcher(watchHndlr);
+        lfd->add_watch(eloop, fd, flags);
+        return lfd;
+    }
+
     // virtual rearm read_ready(EventLoop &eloop, int fd) noexcept = 0;
     // virtual rearm write_ready(EventLoop &eloop, int fd) noexcept = 0;
 };
@@ -1551,16 +1642,14 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
 
             rearmType = loop.processFdRearm(this, rearmType, true);
 
-            if (rearmType == rearm::REMOVE) {
-                loop.getBaseLock().unlock();
-                this->watch_removed();
-                loop.getBaseLock().lock();
-            }
+            post_dispatch(loop, this, rearmType);
         }
     }
 
     void dispatch_second(void *loop_ptr) noexcept override
     {
+        auto &outwatcher = bidi_fd_watcher<EventLoop>::outWatcher;
+
         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
         loop.getBaseLock().unlock();
 
@@ -1570,18 +1659,19 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
 
         if (rearmType != rearm::REMOVED) {
             this->event_flags &= ~OUT_EVENTS;
-            this->active = false;
-            if (this->deleteme) {
+            basewatcher_set_active(outwatcher, false);
+            if (basewatcher_get_deleteme(outwatcher)) {
                 // We don't want a watch that is marked "deleteme" to re-arm itself.
                 rearmType = rearm::REMOVE;
             }
 
             rearmType = loop.processSecondaryRearm(this, rearmType);
 
-            if (rearmType == rearm::REMOVE) {
-                loop.getBaseLock().unlock();
-                this->watch_removed();
-                loop.getBaseLock().lock();
+            if (rearmType == rearm::REQUEUE) {
+                post_dispatch(loop, &outwatcher, rearmType);
+            }
+            else {
+                post_dispatch(loop, this, rearmType);
             }
         }
     }
@@ -1648,11 +1738,13 @@ class child_proc_watcher : private dprivate::BaseChildWatcher<typename EventLoop
     // Returns:
     // - the child pid in the parent
     // - 0 in the child
-    pid_t fork(EventLoop &eloop)
+    pid_t fork(EventLoop &eloop, bool from_reserved = false)
     {
         if (EventLoop::loop_traits_t::supports_childwatch_reservation) {
             // Reserve a watch, fork, then claim reservation
-            reserve_watch(eloop);
+            if (! from_reserved) {
+                reserve_watch(eloop);
+            }
             
             auto &lock = eloop.getBaseLock();
             lock.lock();
@@ -1748,12 +1840,7 @@ class child_proc_watcher_impl : public child_proc_watcher<EventLoop>
             }
 
             // rearmType = loop.process??;
-
-            if (rearmType == rearm::REMOVE) {
-                loop.getBaseLock().unlock();
-                this->watch_removed();
-                loop.getBaseLock().lock();
-            }
+            post_dispatch(loop, this, rearmType);
         }
     }
 };
@@ -1804,6 +1891,43 @@ class timer : private BaseTimerWatcher<typename EventLoop::mutex_t>
         eloop.deregister(this, this->clock);
     }
 
+    template <typename T>
+    static timer<EventLoop> *add_timer(EventLoop &eloop, clock_type clock, bool relative,
+            struct timespec &timeout, struct timespec &interval, T watchHndlr)
+    {
+        class lambda_timer : public timer_impl<EventLoop, lambda_timer>
+        {
+            private:
+            T watchHndlr;
+
+            public:
+            lambda_timer(T watchHandlr_a) : watchHndlr(watchHandlr_a)
+            {
+                //
+            }
+
+            rearm timer_expiry(EventLoop &eloop, int intervals)
+            {
+                return watchHndlr(eloop, intervals);
+            }
+
+            void watch_removed() noexcept override
+            {
+                delete this;
+            }
+        };
+
+        lambda_timer * lt = new lambda_timer(watchHndlr);
+        lt->add_timer(eloop, clock);
+        if (relative) {
+            lt->arm_timer_rel(eloop, timeout, interval);
+        }
+        else {
+            lt->arm_timer(eloop, timeout, interval);
+        }
+        return lt;
+    }
+
     // Timer expired, and the given number of intervals have elapsed before
     // expiry event was queued. Normally intervals == 1 to indicate no
     // overrun.
@@ -1832,11 +1956,7 @@ class timer_impl : public timer<EventLoop>
 
             loop.processTimerRearm(this, rearmType);
 
-            if (rearmType == rearm::REMOVE) {
-                loop.getBaseLock().unlock();
-                this->watch_removed();
-                loop.getBaseLock().lock();
-            }
+            post_dispatch(loop, this, rearmType);
         }
     }
 };