From 191dbfdb114d0c8a16547f9e0c1f2ea428e0a132 Mon Sep 17 00:00:00 2001 From: Davin McCall Date: Sun, 22 Dec 2019 20:28:42 +1000 Subject: [PATCH] Updated bundled Dasynq to 1.1.7 This fixes a slow memory leak. --- src/dasynq/dasynq-btree_set.h | 7 +- src/dasynq/dasynq-daryheap.h | 118 ++++++++----- src/dasynq/dasynq-posixtimer.h | 4 +- src/dasynq/dasynq-pselect.h | 3 +- src/dasynq/dasynq-select.h | 1 - src/dasynq/dasynq-stableheap.h | 3 +- src/dasynq/dasynq-svec.h | 1 - src/dasynq/dasynq.h | 307 ++++++++++++++++++++++----------- 8 files changed, 286 insertions(+), 158 deletions(-) diff --git a/src/dasynq/dasynq-btree_set.h b/src/dasynq/dasynq-btree_set.h index bc547a2..6b54a7d 100644 --- a/src/dasynq/dasynq-btree_set.h +++ b/src/dasynq/dasynq-btree_set.h @@ -2,6 +2,7 @@ #define DASYNQ_BTREE_SET_H #include +#include namespace dasynq { @@ -87,7 +88,7 @@ class btree_set { union nodedata_u { - T data; // TODO this should be obscured to avoid early construction + T data; nodedata_u() {} }; @@ -296,10 +297,10 @@ class btree_set } // Allocate a slot, but do not incorporate into the heap: - template void allocate(handle_t &hn, U... u) + template void allocate(handle_t &hn, U&&... u) { alloc_slot(); - new (& hn.nodedata.data) T(u...); + new (& hn.nodedata.data) T(std::forward(u)...); } void deallocate(handle_t & hn) noexcept diff --git a/src/dasynq/dasynq-daryheap.h b/src/dasynq/dasynq-daryheap.h index bf98c7d..d824137 100644 --- a/src/dasynq/dasynq-daryheap.h +++ b/src/dasynq/dasynq-daryheap.h @@ -1,22 +1,46 @@ -#ifndef DASYNC_DARYHEAP_H_INCLUDED -#define DASYNC_DARYHEAP_H_INCLUDED +#ifndef DASYNQ_DARYHEAP_H_INCLUDED +#define DASYNQ_DARYHEAP_H_INCLUDED -#include "dasynq-svec.h" #include #include +#include #include +#include "dasynq-svec.h" + + namespace dasynq { /** - * Priority queue implementation based on a binary heap. + * Priority queue implementation based on a heap with parameterised fan-out. All nodes are stored + * in a vector, with the root at position 0, and each node has N child nodes, at positions + * (p * N + 1) through (p * N + N) where p is the (parent) node position. + * + * With N=2, this is a binary heap. Higher values of N may give better performance due to better + * cache locality, but also increase fan-out which will (if too high) also reduce performance. + * + * The destructor will not clean up (destruct) objects that have been added to the queue. If the + * destructor of the element type (T) is non-trivial, all handles should be de-allocated before + * destroying the queue. * - * Heap entry "handles" maintain an index into the heap. When the position of a node in the heap - * changes, its handle must be updated. + * Implementation details: + * + * Adding a node returns a "handle", which maintains an index into the heap. When the position of + * a node in the heap changes, its handle must be updated (the advantage is that changing priority + * of or removing a node does not require a linear search for the node). + * + * Node data is actually stored as part of the handle, not in the queue. + * + * To add a node to the queue, it is inserted at the end and then "bubbled down" to its correct + * location according to priority. To removing a node, the node is replaced with the last node in + * the vector and then that is "bubbled up" to the correct position. + * + * Parameters: * * T : node data type * P : priority type (eg int) * Compare : functional object type to compare priorities + * N : fan out factor (number of child nodes per node) */ template , int N = 4> class dary_heap @@ -27,14 +51,17 @@ class dary_heap private: + static_assert(std::is_nothrow_move_assignable

::value, "P must be no-except move assignable"); + // Actual heap node class heap_node { public: - P data; - handle_t * hnd_p; + P prio; + handle_t * hnd; - heap_node(handle_t * hnd, const P &odata) : data(odata), hnd_p(hnd) + heap_node(handle_t * hnd_p, const P &prio_p) noexcept(std::is_nothrow_copy_constructible

::value) + : prio(prio_p), hnd(hnd_p) { // nothing to do } @@ -81,31 +108,30 @@ class dary_heap private: - // Bubble a newly added timer down to the correct position + // Bubble a newly added node down to the correct position bool bubble_down(hindex_t pos) noexcept { - handle_t * ohndl = hvec[pos].hnd_p; - P op = hvec[pos].data; + handle_t * ohndl = hvec[pos].hnd; + P op = hvec[pos].prio; return bubble_down(pos, ohndl, op); } bool bubble_down(hindex_t pos, handle_t * ohndl, const P &op) noexcept { - // int pos = v.size() - 1; Compare lt; while (pos > 0) { hindex_t parent = (pos - 1) / N; - if (! lt(op, hvec[parent].data)) { + if (! lt(op, hvec[parent].prio)) { break; } - hvec[pos] = hvec[parent]; - hvec[pos].hnd_p->heap_index = pos; + hvec[pos] = std::move(hvec[parent]); + hvec[pos].hnd->heap_index = pos; pos = parent; } - hvec[pos].hnd_p = ohndl; - hvec[pos].data = op; + hvec[pos].hnd = ohndl; + hvec[pos].prio = std::move(op); ohndl->heap_index = pos; return pos == 0; @@ -113,8 +139,8 @@ class dary_heap void bubble_up(hindex_t pos = 0) noexcept { - P p = hvec[pos].data; - handle_t &h = *(hvec[pos].hnd_p); + P p = hvec[pos].prio; + handle_t &h = *(hvec[pos].hnd); bubble_up(pos, h, p); } @@ -129,34 +155,35 @@ class dary_heap hindex_t max = (rmax - 1) / N; while (pos <= max) { + // Find (select) the smallest child node hindex_t lchild = pos * N + 1; hindex_t selchild = lchild; hindex_t rchild = std::min(lchild + N, rmax); for (hindex_t i = lchild + 1; i < rchild; i++) { - if (lt(hvec[i].data, hvec[selchild].data)) { + if (lt(hvec[i].prio, hvec[selchild].prio)) { selchild = i; } } - if (! lt(hvec[selchild].data, p)) { + if (! lt(hvec[selchild].prio, p)) { break; } - hvec[pos] = hvec[selchild]; - hvec[pos].hnd_p->heap_index = pos; + hvec[pos] = std::move(hvec[selchild]); + hvec[pos].hnd->heap_index = pos; pos = selchild; } - hvec[pos].hnd_p = &h; - hvec[pos].data = p; + hvec[pos].hnd = &h; + hvec[pos].prio = std::move(p); h.heap_index = pos; } void remove_h(hindex_t hidx) noexcept { - hvec[hidx].hnd_p->heap_index = -1; + hvec[hidx].hnd->heap_index = -1; if (hvec.size() != hidx + 1) { - bubble_up(hidx, *(hvec.back().hnd_p), hvec.back().data); + bubble_up(hidx, *(hvec.back().hnd), hvec.back().prio); hvec.pop_back(); } else { @@ -173,12 +200,13 @@ class dary_heap // Allocate a slot, but do not incorporate into the heap: // u... : parameters for data constructor T::T(...) - template void allocate(handle_t & hnd, U... u) + template void allocate(handle_t & hnd, U&&... u) { - new (& hnd.hd_u.hd) T(u...); + new (& hnd.hd_u.hd) T(std::forward(u)...); hnd.heap_index = -1; - constexpr hindex_t max_allowed = std::numeric_limits::is_signed ? - std::numeric_limits::max() : ((hindex_t) - 2); + + // largest object size is PTRDIFF_MAX, so we expect the largest vector is that / sizeof node: + constexpr hindex_t max_allowed = (std::numeric_limits::max() - 1) / sizeof(heap_node); if (num_nodes == max_allowed) { throw std::bad_alloc(); @@ -224,57 +252,57 @@ class dary_heap bool insert(handle_t & hnd, const P &pval) noexcept { hnd.heap_index = hvec.size(); - //hvec.emplace_back(&hnd, pval); + // emplace an empty node; data/prio will be stored via bubble_down. hvec.emplace_back(); return bubble_down(hvec.size() - 1, &hnd, pval); } // Get the root node handle. (Returns a handle_t or reference to handle_t). - handle_t & get_root() + handle_t & get_root() noexcept { - return * hvec[0].hnd_p; + return * hvec[0].hnd; } - P &get_root_priority() + P &get_root_priority() noexcept { - return hvec[0].data; + return hvec[0].prio; } - void pull_root() + void pull_root() noexcept { remove_h(0); } - void remove(handle_t & hnd) + void remove(handle_t & hnd) noexcept { remove_h(hnd.heap_index); } - bool empty() + bool empty() noexcept { return hvec.empty(); } - bool is_queued(handle_t & hnd) + bool is_queued(handle_t & hnd) noexcept { return hnd.heap_index != (hindex_t) -1; } // Set a node priority. Returns true iff the node becomes the root node (and wasn't before). - bool set_priority(handle_t & hnd, const P& p) + bool set_priority(handle_t & hnd, const P& p) noexcept { int heap_index = hnd.heap_index; Compare lt; - if (lt(hvec[heap_index].data, p)) { + if (lt(hvec[heap_index].prio, p)) { // Increase key - hvec[heap_index].data = p; + hvec[heap_index].prio = p; bubble_up(heap_index); return false; } else { // Decrease key - hvec[heap_index].data = p; + hvec[heap_index].prio = p; return bubble_down(heap_index); } } diff --git a/src/dasynq/dasynq-posixtimer.h b/src/dasynq/dasynq-posixtimer.h index 17816c3..30eb590 100644 --- a/src/dasynq/dasynq-posixtimer.h +++ b/src/dasynq/dasynq-posixtimer.h @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -70,7 +71,7 @@ class posix_timer_events : public timer_base } } - timer_t &timer_for_clock(clock_type clock) + timer_t &timer_for_clock(clock_type clock) noexcept { switch (clock) { case clock_type::MONOTONIC: @@ -79,6 +80,7 @@ class posix_timer_events : public timer_base return real_timer; default: DASYNQ_UNREACHABLE; + std::abort(); } } diff --git a/src/dasynq/dasynq-pselect.h b/src/dasynq/dasynq-pselect.h index f4a5157..b40b1bf 100644 --- a/src/dasynq/dasynq-pselect.h +++ b/src/dasynq/dasynq-pselect.h @@ -262,8 +262,7 @@ template class pselect_events : public signal_events this->sigmaskf(SIG_SETMASK, &sigmask, &origmask); this->sigmaskf(SIG_SETMASK, &origmask, nullptr); } - - if (r == 0 && do_wait) { + else { // timeout: Base::lock.lock(); this->process_monotonic_timers(); diff --git a/src/dasynq/dasynq-select.h b/src/dasynq/dasynq-select.h index e0ce78a..4c96e2c 100644 --- a/src/dasynq/dasynq-select.h +++ b/src/dasynq/dasynq-select.h @@ -4,7 +4,6 @@ #include #include #include -#include #include #include diff --git a/src/dasynq/dasynq-stableheap.h b/src/dasynq/dasynq-stableheap.h index af4ea7b..0747226 100644 --- a/src/dasynq/dasynq-stableheap.h +++ b/src/dasynq/dasynq-stableheap.h @@ -9,6 +9,7 @@ // The generation counter is a 64-bit integer and can not realistically overflow. #include +#include namespace dasynq { @@ -73,7 +74,7 @@ class stable_heap : private H,compare_stable_prio> template void allocate(handle_t & hnd, U&& ...u) { - Base::allocate(hnd, u...); + Base::allocate(hnd, std::forward(u)...); } static void init_handle(handle_t &hndl) diff --git a/src/dasynq/dasynq-svec.h b/src/dasynq/dasynq-svec.h index b8cc7f7..77039ec 100644 --- a/src/dasynq/dasynq-svec.h +++ b/src/dasynq/dasynq-svec.h @@ -21,7 +21,6 @@ class svector T elem; vec_node() { } - ~vec_node() { } }; vec_node * array; diff --git a/src/dasynq/dasynq.h b/src/dasynq/dasynq.h index e65bad6..bf9706e 100644 --- a/src/dasynq/dasynq.h +++ b/src/dasynq/dasynq.h @@ -242,6 +242,11 @@ namespace dprivate { return nullptr; } + waitqueue_node * get_second() + { + return nullptr; + } + bool check_head(waitqueue_node &node) { return true; @@ -278,6 +283,11 @@ namespace dprivate { return head; } + waitqueue_node * get_second() + { + return head->next; + } + bool check_head(waitqueue_node &node) { return head == &node; @@ -312,9 +322,16 @@ namespace dprivate { template static rearm process_fd_rearm(Loop &loop, typename Loop::base_fd_watcher *bfw, - rearm rearm_type, bool is_multi_watch) noexcept + rearm rearm_type) noexcept { - return loop.process_fd_rearm(bfw, rearm_type, is_multi_watch); + return loop.process_fd_rearm(bfw, rearm_type); + } + + template + static rearm process_primary_rearm(Loop &loop, typename Loop::base_bidi_fd_watcher *bdfw, + rearm rearm_type) noexcept + { + return loop.process_primary_rearm(bdfw, rearm_type); } template @@ -350,14 +367,22 @@ namespace dprivate { { loop.requeue_watcher(watcher); } + + template + static void release_watcher(Loop &loop, base_watcher *watcher) noexcept + { + loop.release_watcher(watcher); + } }; // Do standard post-dispatch processing for a watcher. This handles the case of removing or - // re-queueing watchers depending on the rearm type. + // re-queueing watchers depending on the rearm type. This is called from the individual + // watcher dispatch functions to handle REMOVE or REQUEUE re-arm values. template void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearm_type) { if (rearm_type == rearm::REMOVE) { loop_access::get_base_lock(loop).unlock(); + loop_access::release_watcher(loop, watcher); watcher->watch_removed(); loop_access::get_base_lock(loop).lock(); } @@ -366,15 +391,33 @@ namespace dprivate { } } - // This class serves as the base class (mixin) for the backend mechanism. + // Post-dispatch handling for bidi fd watchers. + template void post_dispatch(Loop &loop, bidi_fd_watcher *bdfd_watcher, + base_watcher *out_watcher, rearm rearm_type) + { + base_watcher *watcher = (base_watcher *)bdfd_watcher; + if (rearm_type == rearm::REMOVE) { + loop_access::get_base_lock(loop).unlock(); + loop_access::release_watcher(loop, watcher); + loop_access::release_watcher(loop, out_watcher); + watcher->watch_removed(); + loop_access::get_base_lock(loop).lock(); + } + else if (rearm_type == rearm::REQUEUE) { + loop_access::requeue_watcher(loop, watcher); + } + } + + // The event_dispatch class serves as the base class (mixin) for the backend mechanism. It + // mostly manages queing and dequeing of events and maintains/owns the relevant data + // structures, including a mutex lock. // - // The event_dispatch class maintains the queued event data structures. It inserts watchers - // into the queue when events are received (receiveXXX methods). It also owns a mutex used - // to protect those structures. + // The backend mechanism should call one of the receiveXXX functions to notify of an event + // received. The watcher will then be queued. // - // In general the methods should be called with lock held. In practice this means that the - // event loop backend implementations must obtain the lock; they are also free to use it to - // protect their own internal data structures. + // In general the functions should be called with lock held. In practice this means that the + // event loop backend implementations (that deposit received events here) must obtain the + // lock; they are also free to use it to protect their own internal data structures. template class event_dispatch { friend class dasynq::event_loop;; @@ -528,7 +571,6 @@ namespace dprivate { // If the watcher is active, set deleteme true; the watcher will be removed // at the end of current processing (i.e. when active is set false). watcher->deleteme = true; - release_watcher(watcher); lock.unlock(); } else { @@ -668,15 +710,27 @@ class event_loop // So, we use two wait queues protected by a single mutex. The "attn_waitqueue" // (attention queue) is the high-priority queue, used for threads wanting to // unwatch event sources. The "wait_waitquueue" is the queue used by threads - // that wish to actually poll for events. + // that wish to actually poll for events, while they are waiting for the main + // queue to become quiet. // - The head of the "attn_waitqueue" is always the holder of the lock // - Therefore, a poll-waiter must be moved from the wait_waitqueue to the // attn_waitqueue to actually gain the lock. This is only done if the // attn_waitqueue is otherwise empty. // - The mutex only protects manipulation of the wait queues, and so should not // be highly contended. + // + // To claim the lock for a poll-wait, the procedure is: + // - check if the attn_waitqueue is empty; + // - if it is, insert node at the head, thus claiming the lock, and return + // - otherwise, insert node in the wait_waitqueue, and wait + // To claim the lock for an unwatch, the procedure is: + // - insert node in the attn_waitqueue + // - if the node is at the head of the queue, lock is claimed; return + // - otherwise, if a poll is in progress, interrupt it + // - wait until our node is at the head of the attn_waitqueue mutex_t wait_lock; // protects the wait/attention queues + bool long_poll_running = false; // whether any thread is polling the backend (with non-zero timeout) waitqueue attn_waitqueue; waitqueue wait_waitqueue; @@ -1017,6 +1071,11 @@ class event_loop interrupt_if_necessary(); } + void release_watcher(base_watcher *watcher) noexcept + { + loop_mech.release_watcher(watcher); + } + // Interrupt the current poll-waiter, if necessary - that is, if the loop is multi-thread safe, and if // there is currently another thread polling the backend event mechanism. void interrupt_if_necessary() @@ -1032,19 +1091,48 @@ class event_loop // Acquire the attention lock (when held, ensures that no thread is polling the AEN // mechanism). This can be used to safely remove watches, since it is certain that - // notification callbacks won't be run while the attention lock is held. + // notification callbacks won't be run while the attention lock is held. Any in-progress + // poll will be interrupted so that the lock should be acquired quickly. void get_attn_lock(waitqueue_node &qnode) noexcept { std::unique_lock ulock(wait_lock); attn_waitqueue.queue(&qnode); if (! attn_waitqueue.check_head(qnode)) { - loop_mech.interrupt_wait(); + if (long_poll_running) { + // We want to interrupt any in-progress poll so that the attn queue will progress + // but we don't want to do that unnecessarily. If we are 2nd in the queue then the + // head must be doing the poll; interrupt it. Otherwise, we assume the 2nd has + // already interrupted it. + if (attn_waitqueue.get_second() == &qnode) { + loop_mech.interrupt_wait(); + } + } while (! attn_waitqueue.check_head(qnode)) { qnode.wait(ulock); } } } + // Acquire the attention lock, but without interrupting any poll that's in progress + // (prefer to fail in that case). + bool poll_attn_lock(waitqueue_node &qnode) noexcept + { + std::unique_lock ulock(wait_lock); + if (long_poll_running) { + // There are poll-waiters, bail out + return false; + } + + // Nobody's doing a long poll, wait until we're at the head of the attn queue and return + // success: + attn_waitqueue.queue(&qnode); + while (! attn_waitqueue.check_head(qnode)) { + qnode.wait(ulock); + } + + return true; + } + // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower priority than // the attention lock). The poll-wait lock is used to prevent more than a single thread from // polling the event loop mechanism at a time; if this is not done, it is basically @@ -1062,22 +1150,29 @@ class event_loop while (! attn_waitqueue.check_head(qnode)) { qnode.wait(ulock); - } + } + + long_poll_running = true; } // Release the poll-wait/attention lock. void release_lock(waitqueue_node &qnode) noexcept { std::unique_lock ulock(wait_lock); + long_poll_running = false; waitqueue_node * nhead = attn_waitqueue.unqueue(); if (nhead != nullptr) { + // Someone else now owns the lock, signal them to wake them up nhead->signal(); } else { + // Nobody is waiting in attn_waitqueue (the high-priority queue) so check in + // wait_waitqueue (the low-priority queue) if (! wait_waitqueue.is_empty()) { auto nhead = wait_waitqueue.get_head(); wait_waitqueue.unqueue(); attn_waitqueue.queue(nhead); + long_poll_running = true; nhead->signal(); } } @@ -1098,112 +1193,114 @@ class event_loop // Note that signal watchers cannot (currently) be disarmed } - // Process rearm return for fd_watcher, including the primary watcher of a bidi_fd_watcher - rearm process_fd_rearm(base_fd_watcher * bfw, rearm rearm_type, bool is_multi_watch) noexcept + // Process rearm return from an fd_watcher, including the primary watcher of a bidi_fd_watcher. + // Depending on the rearm value, we re-arm, remove, or disarm the watcher, etc. + rearm process_fd_rearm(base_fd_watcher * bfw, rearm rearm_type) noexcept { bool emulatedfd = static_cast(bfw)->emulatefd; + if (emulatedfd) { + if (rearm_type == rearm::REARM) { + bfw->emulate_enabled = true; + rearm_type = rearm::REQUEUE; + } + else if (rearm_type == rearm::DISARM) { + bfw->emulate_enabled = false; + } + else if (rearm_type == rearm::NOOP) { + if (bfw->emulate_enabled) { + rearm_type = rearm::REQUEUE; + } + } + } + else if (rearm_type == rearm::REARM) { + set_fd_enabled_nolock(bfw, bfw->watch_fd, + bfw->watch_flags & (IN_EVENTS | OUT_EVENTS), true); + } + else if (rearm_type == rearm::DISARM) { + loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags); + } + else if (rearm_type == rearm::REMOVE) { + loop_mech.remove_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags); + } + return rearm_type; + } + + // Process rearm option from the primary watcher in bidi_fd_watcher + rearm process_primary_rearm(base_bidi_fd_watcher * bdfw, rearm rearm_type) noexcept + { + bool emulatedfd = static_cast(bdfw)->emulatefd; + // Called with lock held - if (is_multi_watch) { - base_bidi_fd_watcher * bdfw = static_cast(bfw); + if (rearm_type == rearm::REMOVE) { + bdfw->read_removed = 1; - if (rearm_type == rearm::REMOVE) { - bdfw->read_removed = 1; - - if (backend_traits_t::has_separate_rw_fd_watches) { - bdfw->watch_flags &= ~IN_EVENTS; - if (! emulatedfd) { - loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS); - } - return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP; + if (backend_traits_t::has_separate_rw_fd_watches) { + bdfw->watch_flags &= ~IN_EVENTS; + if (! emulatedfd) { + loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS); } - else { - if (! bdfw->write_removed) { - if (bdfw->watch_flags & IN_EVENTS) { - bdfw->watch_flags &= ~IN_EVENTS; - if (! emulatedfd) { - set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags, - bdfw->watch_flags != 0); - } - } - return rearm::NOOP; - } - else { - // both removed: actually remove + return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP; + } + else { + if (! bdfw->write_removed) { + if (bdfw->watch_flags & IN_EVENTS) { + bdfw->watch_flags &= ~IN_EVENTS; if (! emulatedfd) { - loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, 0 /* not used */); + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags, + bdfw->watch_flags != 0); } - return rearm::REMOVE; } + return rearm::NOOP; } - } - else if (rearm_type == rearm::DISARM) { - bdfw->watch_flags &= ~IN_EVENTS; - - if (! emulatedfd) { - if (! backend_traits_t::has_separate_rw_fd_watches) { - int watch_flags = bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS); - set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags, watch_flags != 0); - } - else { - loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS); + else { + // both removed: actually remove + if (! emulatedfd) { + loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, 0 /* not used */); } + return rearm::REMOVE; } } - else if (rearm_type == rearm::REARM) { - if (! emulatedfd) { - bdfw->watch_flags |= IN_EVENTS; - if (! backend_traits_t::has_separate_rw_fd_watches) { - int watch_flags = bdfw->watch_flags; - set_fd_enabled_nolock(bdfw, bdfw->watch_fd, - watch_flags & (IN_EVENTS | OUT_EVENTS), true); - } - else { - set_fd_enabled_nolock(bdfw, bdfw->watch_fd, IN_EVENTS, true); - } + } + else if (rearm_type == rearm::DISARM) { + bdfw->watch_flags &= ~IN_EVENTS; + + if (! emulatedfd) { + if (! backend_traits_t::has_separate_rw_fd_watches) { + int watch_flags = bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS); + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags, watch_flags != 0); } else { - bdfw->watch_flags &= ~IN_EVENTS; - rearm_type = rearm::REQUEUE; + loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS); } } - else if (rearm_type == rearm::NOOP) { - if (bdfw->emulatefd) { - if (bdfw->watch_flags & IN_EVENTS) { - bdfw->watch_flags &= ~IN_EVENTS; - rearm_type = rearm::REQUEUE; - } - } - } - return rearm_type; } - else { // Not multi-watch: - if (emulatedfd) { - if (rearm_type == rearm::REARM) { - bfw->emulate_enabled = true; - rearm_type = rearm::REQUEUE; - } - else if (rearm_type == rearm::DISARM) { - bfw->emulate_enabled = false; + else if (rearm_type == rearm::REARM) { + if (! emulatedfd) { + bdfw->watch_flags |= IN_EVENTS; + if (! backend_traits_t::has_separate_rw_fd_watches) { + int watch_flags = bdfw->watch_flags; + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, + watch_flags & (IN_EVENTS | OUT_EVENTS), true); } - else if (rearm_type == rearm::NOOP) { - if (bfw->emulate_enabled) { - rearm_type = rearm::REQUEUE; - } + else { + set_fd_enabled_nolock(bdfw, bdfw->watch_fd, IN_EVENTS, true); } } - else if (rearm_type == rearm::REARM) { - set_fd_enabled_nolock(bfw, bfw->watch_fd, - bfw->watch_flags & (IN_EVENTS | OUT_EVENTS), true); - } - else if (rearm_type == rearm::DISARM) { - loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags); + else { + bdfw->watch_flags &= ~IN_EVENTS; + rearm_type = rearm::REQUEUE; } - else if (rearm_type == rearm::REMOVE) { - loop_mech.remove_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags); + } + else if (rearm_type == rearm::NOOP) { + if (bdfw->emulatefd) { + if (bdfw->watch_flags & IN_EVENTS) { + bdfw->watch_flags &= ~IN_EVENTS; + rearm_type = rearm::REQUEUE; + } } - return rearm_type; } + return rearm_type; } // Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher. @@ -1393,9 +1490,10 @@ class event_loop void poll(int limit = -1) noexcept { waitqueue_node qnode; - get_pollwait_lock(qnode); - loop_mech.pull_events(false); - release_lock(qnode); + if (poll_attn_lock(qnode)) { + loop_mech.pull_events(false); + release_lock(qnode); + } process_events(limit); } @@ -1665,7 +1763,7 @@ class fd_watcher_impl : public fd_watcher rearm_type = rearm::REMOVE; } - rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, false); + rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type); post_dispatch(loop, this, rearm_type); } @@ -1869,9 +1967,10 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher rearm_type = rearm::REMOVE; } - rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, true); + rearm_type = loop_access::process_primary_rearm(loop, this, rearm_type); - post_dispatch(loop, this, rearm_type); + auto &outwatcher = bidi_fd_watcher::out_watcher; + post_dispatch(loop, this, &outwatcher, rearm_type); } } @@ -1900,7 +1999,7 @@ class bidi_fd_watcher_impl : public bidi_fd_watcher post_dispatch(loop, &outwatcher, rearm_type); } else { - post_dispatch(loop, this, rearm_type); + post_dispatch(loop, this, &outwatcher, rearm_type); } } } -- 2.25.1