Updated bundled Dasynq to 1.1.7
authorDavin McCall <davmac@davmac.org>
Sun, 22 Dec 2019 10:28:42 +0000 (20:28 +1000)
committerDavin McCall <davmac@davmac.org>
Sun, 22 Dec 2019 10:28:42 +0000 (20:28 +1000)
This fixes a slow memory leak.

src/dasynq/dasynq-btree_set.h
src/dasynq/dasynq-daryheap.h
src/dasynq/dasynq-posixtimer.h
src/dasynq/dasynq-pselect.h
src/dasynq/dasynq-select.h
src/dasynq/dasynq-stableheap.h
src/dasynq/dasynq-svec.h
src/dasynq/dasynq.h

index bc547a26deb876e6f5f50041bac7425fa12289c2..6b54a7d120209bc98d8ec544bc9468c92391399d 100644 (file)
@@ -2,6 +2,7 @@
 #define DASYNQ_BTREE_SET_H
 
 #include <functional>
+#include <utility>
 
 namespace dasynq {
 
@@ -87,7 +88,7 @@ class btree_set
     {
         union nodedata_u
         {
-            T data; // TODO this should be obscured to avoid early construction
+            T data;
 
             nodedata_u() {}
         };
@@ -296,10 +297,10 @@ class btree_set
     }
 
     // Allocate a slot, but do not incorporate into the heap:
-    template <typename ...U> void allocate(handle_t &hn, U... u)
+    template <typename ...U> void allocate(handle_t &hn, U&&... u)
     {
         alloc_slot();
-        new (& hn.nodedata.data) T(u...);
+        new (& hn.nodedata.data) T(std::forward<U>(u)...);
     }
 
     void deallocate(handle_t & hn) noexcept
index bf98c7d788ccc8f79138d9039259b72c2d959a02..d824137966ffc59d561c745c8e5ad0668a924a13 100644 (file)
@@ -1,22 +1,46 @@
-#ifndef DASYNC_DARYHEAP_H_INCLUDED
-#define DASYNC_DARYHEAP_H_INCLUDED
+#ifndef DASYNQ_DARYHEAP_H_INCLUDED
+#define DASYNQ_DARYHEAP_H_INCLUDED
 
-#include "dasynq-svec.h"
 #include <type_traits>
 #include <functional>
+#include <utility>
 #include <limits>
 
+#include "dasynq-svec.h"
+
+
 namespace dasynq {
 
 /**
- * Priority queue implementation based on a binary heap.
+ * Priority queue implementation based on a heap with parameterised fan-out. All nodes are stored
+ * in a vector, with the root at position 0, and each node has N child nodes, at positions
+ * (p * N + 1) through (p * N + N) where p is the (parent) node position.
+ *
+ * With N=2, this is a binary heap. Higher values of N may give better performance due to better
+ * cache locality, but also increase fan-out which will (if too high) also reduce performance.
+ *
+ * The destructor will not clean up (destruct) objects that have been added to the queue. If the
+ * destructor of the element type (T) is non-trivial, all handles should be de-allocated before
+ * destroying the queue.
  *
- * Heap entry "handles" maintain an index into the heap. When the position of a node in the heap
- * changes, its handle must be updated.
+ * Implementation details:
+ *
+ * Adding a node returns a "handle", which maintains an index into the heap. When the position of
+ * a node in the heap changes, its handle must be updated (the advantage is that changing priority
+ * of or removing a node does not require a linear search for the node).
+ *
+ * Node data is actually stored as part of the handle, not in the queue.
+ *
+ * To add a node to the queue, it is inserted at the end and then "bubbled down" to its correct
+ * location according to priority. To removing a node, the node is replaced with the last node in
+ * the vector and then that is "bubbled up" to the correct position.
+ *
+ * Parameters:
  *
  * T : node data type
  * P : priority type (eg int)
  * Compare : functional object type to compare priorities
+ * N : fan out factor (number of child nodes per node)
  */
 template <typename T, typename P, typename Compare = std::less<P>, int N = 4>
 class dary_heap
@@ -27,14 +51,17 @@ class dary_heap
 
     private:
 
+    static_assert(std::is_nothrow_move_assignable<P>::value, "P must be no-except move assignable");
+
     // Actual heap node
     class heap_node
     {
         public:
-        P data;
-        handle_t * hnd_p;
+        P prio;
+        handle_t * hnd;
 
-        heap_node(handle_t * hnd, const P &odata) : data(odata), hnd_p(hnd)
+        heap_node(handle_t * hnd_p, const P &prio_p) noexcept(std::is_nothrow_copy_constructible<P>::value)
+            : prio(prio_p), hnd(hnd_p)
         {
             // nothing to do
         }
@@ -81,31 +108,30 @@ class dary_heap
 
     private:
 
-    // Bubble a newly added timer down to the correct position
+    // Bubble a newly added node down to the correct position
     bool bubble_down(hindex_t pos) noexcept
     {
-        handle_t * ohndl = hvec[pos].hnd_p;
-        P op = hvec[pos].data;
+        handle_t * ohndl = hvec[pos].hnd;
+        P op = hvec[pos].prio;
         return bubble_down(pos, ohndl, op);
     }
 
     bool bubble_down(hindex_t pos, handle_t * ohndl, const P &op) noexcept
     {
-        // int pos = v.size() - 1;
         Compare lt;
         while (pos > 0) {
             hindex_t parent = (pos - 1) / N;
-            if (! lt(op, hvec[parent].data)) {
+            if (! lt(op, hvec[parent].prio)) {
                 break;
             }
 
-            hvec[pos] = hvec[parent];
-            hvec[pos].hnd_p->heap_index = pos;
+            hvec[pos] = std::move(hvec[parent]);
+            hvec[pos].hnd->heap_index = pos;
             pos = parent;
         }
 
-        hvec[pos].hnd_p = ohndl;
-        hvec[pos].data = op;
+        hvec[pos].hnd = ohndl;
+        hvec[pos].prio = std::move(op);
         ohndl->heap_index = pos;
 
         return pos == 0;
@@ -113,8 +139,8 @@ class dary_heap
 
     void bubble_up(hindex_t pos = 0) noexcept
     {
-        P p = hvec[pos].data;
-        handle_t &h = *(hvec[pos].hnd_p);
+        P p = hvec[pos].prio;
+        handle_t &h = *(hvec[pos].hnd);
         bubble_up(pos, h, p);
     }
 
@@ -129,34 +155,35 @@ class dary_heap
         hindex_t max = (rmax - 1) / N;
 
         while (pos <= max) {
+            // Find (select) the smallest child node
             hindex_t lchild = pos * N + 1;
             hindex_t selchild = lchild;
             hindex_t rchild = std::min(lchild + N, rmax);
             for (hindex_t i = lchild + 1; i < rchild; i++) {
-                if (lt(hvec[i].data, hvec[selchild].data)) {
+                if (lt(hvec[i].prio, hvec[selchild].prio)) {
                     selchild = i;
                 }
             }
 
-            if (! lt(hvec[selchild].data, p)) {
+            if (! lt(hvec[selchild].prio, p)) {
                 break;
             }
 
-            hvec[pos] = hvec[selchild];
-            hvec[pos].hnd_p->heap_index = pos;
+            hvec[pos] = std::move(hvec[selchild]);
+            hvec[pos].hnd->heap_index = pos;
             pos = selchild;
         }
 
-        hvec[pos].hnd_p = &h;
-        hvec[pos].data = p;
+        hvec[pos].hnd = &h;
+        hvec[pos].prio = std::move(p);
         h.heap_index = pos;
     }
 
     void remove_h(hindex_t hidx) noexcept
     {
-        hvec[hidx].hnd_p->heap_index = -1;
+        hvec[hidx].hnd->heap_index = -1;
         if (hvec.size() != hidx + 1) {
-            bubble_up(hidx, *(hvec.back().hnd_p), hvec.back().data);
+            bubble_up(hidx, *(hvec.back().hnd), hvec.back().prio);
             hvec.pop_back();
         }
         else {
@@ -173,12 +200,13 @@ class dary_heap
 
     // Allocate a slot, but do not incorporate into the heap:
     //  u... : parameters for data constructor T::T(...)
-    template <typename ...U> void allocate(handle_t & hnd, U... u)
+    template <typename ...U> void allocate(handle_t & hnd, U&&... u)
     {
-        new (& hnd.hd_u.hd) T(u...);
+        new (& hnd.hd_u.hd) T(std::forward<U>(u)...);
         hnd.heap_index = -1;
-        constexpr hindex_t max_allowed = std::numeric_limits<hindex_t>::is_signed ?
-                std::numeric_limits<hindex_t>::max() : ((hindex_t) - 2);
+
+        // largest object size is PTRDIFF_MAX, so we expect the largest vector is that / sizeof node:
+        constexpr hindex_t max_allowed = (std::numeric_limits<ptrdiff_t>::max() - 1) / sizeof(heap_node);
 
         if (num_nodes == max_allowed) {
             throw std::bad_alloc();
@@ -224,57 +252,57 @@ class dary_heap
     bool insert(handle_t & hnd, const P &pval) noexcept
     {
         hnd.heap_index = hvec.size();
-        //hvec.emplace_back(&hnd, pval);
+        // emplace an empty node; data/prio will be stored via bubble_down.
         hvec.emplace_back();
         return bubble_down(hvec.size() - 1, &hnd, pval);
     }
 
     // Get the root node handle. (Returns a handle_t or reference to handle_t).
-    handle_t & get_root()
+    handle_t & get_root() noexcept
     {
-        return * hvec[0].hnd_p;
+        return * hvec[0].hnd;
     }
 
-    P &get_root_priority()
+    P &get_root_priority() noexcept
     {
-        return hvec[0].data;
+        return hvec[0].prio;
     }
 
-    void pull_root()
+    void pull_root() noexcept
     {
         remove_h(0);
     }
 
-    void remove(handle_t & hnd)
+    void remove(handle_t & hnd) noexcept
     {
         remove_h(hnd.heap_index);
     }
 
-    bool empty()
+    bool empty() noexcept
     {
         return hvec.empty();
     }
 
-    bool is_queued(handle_t & hnd)
+    bool is_queued(handle_t & hnd) noexcept
     {
         return hnd.heap_index != (hindex_t) -1;
     }
 
     // Set a node priority. Returns true iff the node becomes the root node (and wasn't before).
-    bool set_priority(handle_t & hnd, const P& p)
+    bool set_priority(handle_t & hnd, const P& p) noexcept
     {
         int heap_index = hnd.heap_index;
 
         Compare lt;
-        if (lt(hvec[heap_index].data, p)) {
+        if (lt(hvec[heap_index].prio, p)) {
             // Increase key
-            hvec[heap_index].data = p;
+            hvec[heap_index].prio = p;
             bubble_up(heap_index);
             return false;
         }
         else {
             // Decrease key
-            hvec[heap_index].data = p;
+            hvec[heap_index].prio = p;
             return bubble_down(heap_index);
         }
     }
index 17816c3b7fc43e0520dcdee440d2795e59e0a8e5..30eb590b9ae46daa8a6d4b2ec89d9ef24e23c673 100644 (file)
@@ -1,5 +1,6 @@
 #include <vector>
 #include <utility>
+#include <cstdlib>
 
 #include <sys/time.h>
 #include <time.h>
@@ -70,7 +71,7 @@ class posix_timer_events : public timer_base<Base>
         }
     }
 
-    timer_t &timer_for_clock(clock_type clock)
+    timer_t &timer_for_clock(clock_type clock) noexcept
     {
         switch (clock) {
         case clock_type::MONOTONIC:
@@ -79,6 +80,7 @@ class posix_timer_events : public timer_base<Base>
             return real_timer;
         default:
             DASYNQ_UNREACHABLE;
+            std::abort();
         }
     }
 
index f4a515721d8fd8e87bee1b9ce2c60d413735067e..b40b1bfea0400f9d1a370ba7419e5232e5fbaf07 100644 (file)
@@ -262,8 +262,7 @@ template <class Base> class pselect_events : public signal_events<Base, false>
                     this->sigmaskf(SIG_SETMASK, &sigmask, &origmask);
                     this->sigmaskf(SIG_SETMASK, &origmask, nullptr);
                 }
-
-                if (r == 0 && do_wait) {
+                else {
                     // timeout:
                     Base::lock.lock();
                     this->process_monotonic_timers();
index e0ce78a8a89474d9b1896ba85f58c6f110daaf64..4c96e2cc86876fba39e5a2f9cadaabe8dfb2e120 100644 (file)
@@ -4,7 +4,6 @@
 #include <system_error>
 #include <vector>
 #include <atomic>
-#include <tuple>
 
 #include <sys/time.h>
 #include <sys/types.h>
index af4ea7b4a41ec2fe3661aad99d30655783e51c47..074722676ec10941b34bed4d0cc07ab28ed39660 100644 (file)
@@ -9,6 +9,7 @@
 // The generation counter is a 64-bit integer and can not realistically overflow.
 
 #include <functional>
+#include <utility>
 
 namespace dasynq {
 
@@ -73,7 +74,7 @@ class stable_heap : private H<T,stable_prio<P>,compare_stable_prio<P,C>>
 
     template <typename ...U> void allocate(handle_t & hnd, U&& ...u)
     {
-        Base::allocate(hnd, u...);
+        Base::allocate(hnd, std::forward<U>(u)...);
     }
 
     static void init_handle(handle_t &hndl)
index b8cc7f78b6fb7984285a145dcc754482bc171343..77039ec454dcd31c6f70e0aa318c9ecf8f97cfeb 100644 (file)
@@ -21,7 +21,6 @@ class svector
         T elem;
 
         vec_node() { }
-        ~vec_node() { }
     };
 
     vec_node * array;
index e65bad6f6242110ca94a5d96ee4aba6e68e64af3..bf9706e4ebb37558f1b3e57ff53c7db4772a1f32 100644 (file)
@@ -242,6 +242,11 @@ namespace dprivate {
             return nullptr;
         }
         
+        waitqueue_node<null_mutex> * get_second()
+        {
+            return nullptr;
+        }
+
         bool check_head(waitqueue_node<null_mutex> &node)
         {
             return true;
@@ -278,6 +283,11 @@ namespace dprivate {
             return head;
         }
         
+        waitqueue_node<T_Mutex> * get_second()
+        {
+            return head->next;
+        }
+
         bool check_head(waitqueue_node<T_Mutex> &node)
         {
             return head == &node;
@@ -312,9 +322,16 @@ namespace dprivate {
 
         template <typename Loop>
         static rearm process_fd_rearm(Loop &loop, typename Loop::base_fd_watcher *bfw,
-                rearm rearm_type, bool is_multi_watch) noexcept
+                rearm rearm_type) noexcept
         {
-            return loop.process_fd_rearm(bfw, rearm_type, is_multi_watch);
+            return loop.process_fd_rearm(bfw, rearm_type);
+        }
+
+        template <typename Loop>
+        static rearm process_primary_rearm(Loop &loop, typename Loop::base_bidi_fd_watcher *bdfw,
+                rearm rearm_type) noexcept
+        {
+            return loop.process_primary_rearm(bdfw, rearm_type);
         }
 
         template <typename Loop>
@@ -350,14 +367,22 @@ namespace dprivate {
         {
             loop.requeue_watcher(watcher);
         }
+
+        template <typename Loop>
+        static void release_watcher(Loop &loop, base_watcher *watcher) noexcept
+        {
+            loop.release_watcher(watcher);
+        }
     };
 
     // Do standard post-dispatch processing for a watcher. This handles the case of removing or
-    // re-queueing watchers depending on the rearm type.
+    // re-queueing watchers depending on the rearm type. This is called from the individual
+    // watcher dispatch functions to handle REMOVE or REQUEUE re-arm values.
     template <typename Loop> void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearm_type)
     {
         if (rearm_type == rearm::REMOVE) {
             loop_access::get_base_lock(loop).unlock();
+            loop_access::release_watcher(loop, watcher);
             watcher->watch_removed();
             loop_access::get_base_lock(loop).lock();
         }
@@ -366,15 +391,33 @@ namespace dprivate {
         }
     }
 
-    // This class serves as the base class (mixin) for the backend mechanism.
+    // Post-dispatch handling for bidi fd watchers.
+    template <typename Loop> void post_dispatch(Loop &loop, bidi_fd_watcher<Loop> *bdfd_watcher,
+            base_watcher *out_watcher, rearm rearm_type)
+    {
+        base_watcher *watcher = (base_watcher *)bdfd_watcher;
+        if (rearm_type == rearm::REMOVE) {
+            loop_access::get_base_lock(loop).unlock();
+            loop_access::release_watcher(loop, watcher);
+            loop_access::release_watcher(loop, out_watcher);
+            watcher->watch_removed();
+            loop_access::get_base_lock(loop).lock();
+        }
+        else if (rearm_type == rearm::REQUEUE) {
+            loop_access::requeue_watcher(loop, watcher);
+        }
+    }
+
+    // The event_dispatch class serves as the base class (mixin) for the backend mechanism. It
+    // mostly manages queing and dequeing of events and maintains/owns the relevant data
+    // structures, including a mutex lock.
     //
-    // The event_dispatch class maintains the queued event data structures. It inserts watchers
-    // into the queue when events are received (receiveXXX methods). It also owns a mutex used
-    // to protect those structures.
+    // The backend mechanism should call one of the receiveXXX functions to notify of an event
+    // received. The watcher will then be queued.
     //
-    // In general the methods should be called with lock held. In practice this means that the
-    // event loop backend implementations must obtain the lock; they are also free to use it to
-    // protect their own internal data structures.
+    // In general the functions should be called with lock held. In practice this means that the
+    // event loop backend implementations (that deposit received events here) must obtain the
+    // lock; they are also free to use it to protect their own internal data structures.
     template <typename Traits, typename LoopTraits> class event_dispatch
     {
         friend class dasynq::event_loop<typename LoopTraits::mutex_t, LoopTraits>;;
@@ -528,7 +571,6 @@ namespace dprivate {
                 // If the watcher is active, set deleteme true; the watcher will be removed
                 // at the end of current processing (i.e. when active is set false).
                 watcher->deleteme = true;
-                release_watcher(watcher);
                 lock.unlock();
             }
             else {
@@ -668,15 +710,27 @@ class event_loop
     // So, we use two wait queues protected by a single mutex. The "attn_waitqueue"
     // (attention queue) is the high-priority queue, used for threads wanting to
     // unwatch event sources. The "wait_waitquueue" is the queue used by threads
-    // that wish to actually poll for events.
+    // that wish to actually poll for events, while they are waiting for the main
+    // queue to become quiet.
     // - The head of the "attn_waitqueue" is always the holder of the lock
     // - Therefore, a poll-waiter must be moved from the wait_waitqueue to the
     //   attn_waitqueue to actually gain the lock. This is only done if the
     //   attn_waitqueue is otherwise empty.
     // - The mutex only protects manipulation of the wait queues, and so should not
     //   be highly contended.
+    //
+    // To claim the lock for a poll-wait, the procedure is:
+    //    - check if the attn_waitqueue is empty;
+    //    - if it is, insert node at the head, thus claiming the lock, and return
+    //    - otherwise, insert node in the wait_waitqueue, and wait
+    // To claim the lock for an unwatch, the procedure is:
+    //    - insert node in the attn_waitqueue
+    //    - if the node is at the head of the queue, lock is claimed; return
+    //    - otherwise, if a poll is in progress, interrupt it
+    //    - wait until our node is at the head of the attn_waitqueue
     
     mutex_t wait_lock;  // protects the wait/attention queues
+    bool long_poll_running = false;  // whether any thread is polling the backend (with non-zero timeout)
     waitqueue<mutex_t> attn_waitqueue;
     waitqueue<mutex_t> wait_waitqueue;
     
@@ -1017,6 +1071,11 @@ class event_loop
         interrupt_if_necessary();
     }
 
+    void release_watcher(base_watcher *watcher) noexcept
+    {
+        loop_mech.release_watcher(watcher);
+    }
+
     // Interrupt the current poll-waiter, if necessary - that is, if the loop is multi-thread safe, and if
     // there is currently another thread polling the backend event mechanism.
     void interrupt_if_necessary()
@@ -1032,19 +1091,48 @@ class event_loop
 
     // Acquire the attention lock (when held, ensures that no thread is polling the AEN
     // mechanism). This can be used to safely remove watches, since it is certain that
-    // notification callbacks won't be run while the attention lock is held.
+    // notification callbacks won't be run while the attention lock is held. Any in-progress
+    // poll will be interrupted so that the lock should be acquired quickly.
     void get_attn_lock(waitqueue_node<T_Mutex> &qnode) noexcept
     {
         std::unique_lock<T_Mutex> ulock(wait_lock);
         attn_waitqueue.queue(&qnode);        
         if (! attn_waitqueue.check_head(qnode)) {
-            loop_mech.interrupt_wait();
+            if (long_poll_running) {
+                // We want to interrupt any in-progress poll so that the attn queue will progress
+                // but we don't want to do that unnecessarily. If we are 2nd in the queue then the
+                // head must be doing the poll; interrupt it. Otherwise, we assume the 2nd has
+                // already interrupted it.
+                if (attn_waitqueue.get_second() == &qnode) {
+                    loop_mech.interrupt_wait();
+                }
+            }
             while (! attn_waitqueue.check_head(qnode)) {
                 qnode.wait(ulock);
             }
         }
     }
     
+    // Acquire the attention lock, but without interrupting any poll that's in progress
+    // (prefer to fail in that case).
+    bool poll_attn_lock(waitqueue_node<T_Mutex> &qnode) noexcept
+    {
+        std::unique_lock<T_Mutex> ulock(wait_lock);
+        if (long_poll_running) {
+            // There are poll-waiters, bail out
+            return false;
+        }
+
+        // Nobody's doing a long poll, wait until we're at the head of the attn queue and return
+        // success:
+        attn_waitqueue.queue(&qnode);
+        while (! attn_waitqueue.check_head(qnode)) {
+            qnode.wait(ulock);
+        }
+
+        return true;
+    }
+
     // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower priority than
     // the attention lock). The poll-wait lock is used to prevent more than a single thread from
     // polling the event loop mechanism at a time; if this is not done, it is basically
@@ -1062,22 +1150,29 @@ class event_loop
         
         while (! attn_waitqueue.check_head(qnode)) {
             qnode.wait(ulock);
-        }    
+        }
+
+        long_poll_running = true;
     }
     
     // Release the poll-wait/attention lock.
     void release_lock(waitqueue_node<T_Mutex> &qnode) noexcept
     {
         std::unique_lock<T_Mutex> ulock(wait_lock);
+        long_poll_running = false;
         waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
         if (nhead != nullptr) {
+            // Someone else now owns the lock, signal them to wake them up
             nhead->signal();
         }
         else {
+            // Nobody is waiting in attn_waitqueue (the high-priority queue) so check in
+            // wait_waitqueue (the low-priority queue)
             if (! wait_waitqueue.is_empty()) {
                 auto nhead = wait_waitqueue.get_head();
                 wait_waitqueue.unqueue();
                 attn_waitqueue.queue(nhead);
+                long_poll_running = true;
                 nhead->signal();
             }
         }                
@@ -1098,112 +1193,114 @@ class event_loop
         // Note that signal watchers cannot (currently) be disarmed
     }
 
-    // Process rearm return for fd_watcher, including the primary watcher of a bidi_fd_watcher
-    rearm process_fd_rearm(base_fd_watcher * bfw, rearm rearm_type, bool is_multi_watch) noexcept
+    // Process rearm return from an fd_watcher, including the primary watcher of a bidi_fd_watcher.
+    // Depending on the rearm value, we re-arm, remove, or disarm the watcher, etc.
+    rearm process_fd_rearm(base_fd_watcher * bfw, rearm rearm_type) noexcept
     {
         bool emulatedfd = static_cast<base_watcher *>(bfw)->emulatefd;
 
+        if (emulatedfd) {
+            if (rearm_type == rearm::REARM) {
+                bfw->emulate_enabled = true;
+                rearm_type = rearm::REQUEUE;
+            }
+            else if (rearm_type == rearm::DISARM) {
+                bfw->emulate_enabled = false;
+            }
+            else if (rearm_type == rearm::NOOP) {
+                if (bfw->emulate_enabled) {
+                    rearm_type = rearm::REQUEUE;
+                }
+            }
+        }
+        else  if (rearm_type == rearm::REARM) {
+            set_fd_enabled_nolock(bfw, bfw->watch_fd,
+                    bfw->watch_flags & (IN_EVENTS | OUT_EVENTS), true);
+        }
+        else if (rearm_type == rearm::DISARM) {
+            loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
+        }
+        else if (rearm_type == rearm::REMOVE) {
+            loop_mech.remove_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
+        }
+        return rearm_type;
+    }
+
+    // Process rearm option from the primary watcher in bidi_fd_watcher
+    rearm process_primary_rearm(base_bidi_fd_watcher * bdfw, rearm rearm_type) noexcept
+    {
+        bool emulatedfd = static_cast<base_watcher *>(bdfw)->emulatefd;
+
         // Called with lock held
-        if (is_multi_watch) {
-            base_bidi_fd_watcher * bdfw = static_cast<base_bidi_fd_watcher *>(bfw);
+        if (rearm_type == rearm::REMOVE) {
+            bdfw->read_removed = 1;
 
-            if (rearm_type == rearm::REMOVE) {
-                bdfw->read_removed = 1;
-                
-                if (backend_traits_t::has_separate_rw_fd_watches) {
-                    bdfw->watch_flags &= ~IN_EVENTS;
-                    if (! emulatedfd) {
-                        loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
-                    }
-                    return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP;
+            if (backend_traits_t::has_separate_rw_fd_watches) {
+                bdfw->watch_flags &= ~IN_EVENTS;
+                if (! emulatedfd) {
+                    loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
                 }
-                else {
-                    if (! bdfw->write_removed) {
-                        if (bdfw->watch_flags & IN_EVENTS) {
-                            bdfw->watch_flags &= ~IN_EVENTS;
-                            if (! emulatedfd) {
-                                set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags,
-                                        bdfw->watch_flags != 0);
-                            }
-                        }
-                        return rearm::NOOP;
-                    }
-                    else {
-                        // both removed: actually remove
+                return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP;
+            }
+            else {
+                if (! bdfw->write_removed) {
+                    if (bdfw->watch_flags & IN_EVENTS) {
+                        bdfw->watch_flags &= ~IN_EVENTS;
                         if (! emulatedfd) {
-                            loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, 0 /* not used */);
+                            set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags,
+                                    bdfw->watch_flags != 0);
                         }
-                        return rearm::REMOVE;
                     }
+                    return rearm::NOOP;
                 }
-            }
-            else if (rearm_type == rearm::DISARM) {
-                bdfw->watch_flags &= ~IN_EVENTS;
-
-                if (! emulatedfd) {
-                    if (! backend_traits_t::has_separate_rw_fd_watches) {
-                        int watch_flags = bdfw->watch_flags  & (IN_EVENTS | OUT_EVENTS);
-                        set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags, watch_flags != 0);
-                    }
-                    else {
-                        loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
+                else {
+                    // both removed: actually remove
+                    if (! emulatedfd) {
+                        loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, 0 /* not used */);
                     }
+                    return rearm::REMOVE;
                 }
             }
-            else if (rearm_type == rearm::REARM) {
-                if (! emulatedfd) {
-                    bdfw->watch_flags |= IN_EVENTS;
-                    if (! backend_traits_t::has_separate_rw_fd_watches) {
-                        int watch_flags = bdfw->watch_flags;
-                        set_fd_enabled_nolock(bdfw, bdfw->watch_fd,
-                                watch_flags & (IN_EVENTS | OUT_EVENTS), true);
-                    }
-                    else {
-                        set_fd_enabled_nolock(bdfw, bdfw->watch_fd, IN_EVENTS, true);
-                    }
+        }
+        else if (rearm_type == rearm::DISARM) {
+            bdfw->watch_flags &= ~IN_EVENTS;
+
+            if (! emulatedfd) {
+                if (! backend_traits_t::has_separate_rw_fd_watches) {
+                    int watch_flags = bdfw->watch_flags  & (IN_EVENTS | OUT_EVENTS);
+                    set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags, watch_flags != 0);
                 }
                 else {
-                    bdfw->watch_flags &= ~IN_EVENTS;
-                    rearm_type = rearm::REQUEUE;
+                    loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
                 }
             }
-            else if (rearm_type == rearm::NOOP) {
-                if (bdfw->emulatefd) {
-                    if (bdfw->watch_flags & IN_EVENTS) {
-                        bdfw->watch_flags &= ~IN_EVENTS;
-                        rearm_type = rearm::REQUEUE;
-                    }
-                }
-            }
-            return rearm_type;
         }
-        else { // Not multi-watch:
-            if (emulatedfd) {
-                if (rearm_type == rearm::REARM) {
-                    bfw->emulate_enabled = true;
-                    rearm_type = rearm::REQUEUE;
-                }
-                else if (rearm_type == rearm::DISARM) {
-                    bfw->emulate_enabled = false;
+        else if (rearm_type == rearm::REARM) {
+            if (! emulatedfd) {
+                bdfw->watch_flags |= IN_EVENTS;
+                if (! backend_traits_t::has_separate_rw_fd_watches) {
+                    int watch_flags = bdfw->watch_flags;
+                    set_fd_enabled_nolock(bdfw, bdfw->watch_fd,
+                            watch_flags & (IN_EVENTS | OUT_EVENTS), true);
                 }
-                else if (rearm_type == rearm::NOOP) {
-                    if (bfw->emulate_enabled) {
-                        rearm_type = rearm::REQUEUE;
-                    }
+                else {
+                    set_fd_enabled_nolock(bdfw, bdfw->watch_fd, IN_EVENTS, true);
                 }
             }
-            else  if (rearm_type == rearm::REARM) {
-                set_fd_enabled_nolock(bfw, bfw->watch_fd,
-                        bfw->watch_flags & (IN_EVENTS | OUT_EVENTS), true);
-            }
-            else if (rearm_type == rearm::DISARM) {
-                loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
+            else {
+                bdfw->watch_flags &= ~IN_EVENTS;
+                rearm_type = rearm::REQUEUE;
             }
-            else if (rearm_type == rearm::REMOVE) {
-                loop_mech.remove_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
+        }
+        else if (rearm_type == rearm::NOOP) {
+            if (bdfw->emulatefd) {
+                if (bdfw->watch_flags & IN_EVENTS) {
+                    bdfw->watch_flags &= ~IN_EVENTS;
+                    rearm_type = rearm::REQUEUE;
+                }
             }
-            return rearm_type;
         }
+        return rearm_type;
     }
 
     // Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher.
@@ -1393,9 +1490,10 @@ class event_loop
     void poll(int limit = -1) noexcept
     {
         waitqueue_node<T_Mutex> qnode;
-        get_pollwait_lock(qnode);
-        loop_mech.pull_events(false);
-        release_lock(qnode);
+        if (poll_attn_lock(qnode)) {
+            loop_mech.pull_events(false);
+            release_lock(qnode);
+        }
 
         process_events(limit);
     }
@@ -1665,7 +1763,7 @@ class fd_watcher_impl : public fd_watcher<EventLoop>
                 rearm_type = rearm::REMOVE;
             }
 
-            rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, false);
+            rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type);
 
             post_dispatch(loop, this, rearm_type);
         }
@@ -1869,9 +1967,10 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
                 rearm_type = rearm::REMOVE;
             }
 
-            rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, true);
+            rearm_type = loop_access::process_primary_rearm(loop, this, rearm_type);
 
-            post_dispatch(loop, this, rearm_type);
+            auto &outwatcher = bidi_fd_watcher<EventLoop>::out_watcher;
+            post_dispatch(loop, this, &outwatcher, rearm_type);
         }
     }
 
@@ -1900,7 +1999,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
                 post_dispatch(loop, &outwatcher, rearm_type);
             }
             else {
-                post_dispatch(loop, this, rearm_type);
+                post_dispatch(loop, this, &outwatcher, rearm_type);
             }
         }
     }