-#ifndef DASYNC_DARYHEAP_H_INCLUDED
-#define DASYNC_DARYHEAP_H_INCLUDED
+#ifndef DASYNQ_DARYHEAP_H_INCLUDED
+#define DASYNQ_DARYHEAP_H_INCLUDED
-#include "dasynq-svec.h"
#include <type_traits>
#include <functional>
+#include <utility>
#include <limits>
+#include "dasynq-svec.h"
+
+
namespace dasynq {
/**
- * Priority queue implementation based on a binary heap.
+ * Priority queue implementation based on a heap with parameterised fan-out. All nodes are stored
+ * in a vector, with the root at position 0, and each node has N child nodes, at positions
+ * (p * N + 1) through (p * N + N) where p is the (parent) node position.
+ *
+ * With N=2, this is a binary heap. Higher values of N may give better performance due to better
+ * cache locality, but also increase fan-out which will (if too high) also reduce performance.
+ *
+ * The destructor will not clean up (destruct) objects that have been added to the queue. If the
+ * destructor of the element type (T) is non-trivial, all handles should be de-allocated before
+ * destroying the queue.
*
- * Heap entry "handles" maintain an index into the heap. When the position of a node in the heap
- * changes, its handle must be updated.
+ * Implementation details:
+ *
+ * Adding a node returns a "handle", which maintains an index into the heap. When the position of
+ * a node in the heap changes, its handle must be updated (the advantage is that changing priority
+ * of or removing a node does not require a linear search for the node).
+ *
+ * Node data is actually stored as part of the handle, not in the queue.
+ *
+ * To add a node to the queue, it is inserted at the end and then "bubbled down" to its correct
+ * location according to priority. To removing a node, the node is replaced with the last node in
+ * the vector and then that is "bubbled up" to the correct position.
+ *
+ * Parameters:
*
* T : node data type
* P : priority type (eg int)
* Compare : functional object type to compare priorities
+ * N : fan out factor (number of child nodes per node)
*/
template <typename T, typename P, typename Compare = std::less<P>, int N = 4>
class dary_heap
private:
+ static_assert(std::is_nothrow_move_assignable<P>::value, "P must be no-except move assignable");
+
// Actual heap node
class heap_node
{
public:
- P data;
- handle_t * hnd_p;
+ P prio;
+ handle_t * hnd;
- heap_node(handle_t * hnd, const P &odata) : data(odata), hnd_p(hnd)
+ heap_node(handle_t * hnd_p, const P &prio_p) noexcept(std::is_nothrow_copy_constructible<P>::value)
+ : prio(prio_p), hnd(hnd_p)
{
// nothing to do
}
private:
- // Bubble a newly added timer down to the correct position
+ // Bubble a newly added node down to the correct position
bool bubble_down(hindex_t pos) noexcept
{
- handle_t * ohndl = hvec[pos].hnd_p;
- P op = hvec[pos].data;
+ handle_t * ohndl = hvec[pos].hnd;
+ P op = hvec[pos].prio;
return bubble_down(pos, ohndl, op);
}
bool bubble_down(hindex_t pos, handle_t * ohndl, const P &op) noexcept
{
- // int pos = v.size() - 1;
Compare lt;
while (pos > 0) {
hindex_t parent = (pos - 1) / N;
- if (! lt(op, hvec[parent].data)) {
+ if (! lt(op, hvec[parent].prio)) {
break;
}
- hvec[pos] = hvec[parent];
- hvec[pos].hnd_p->heap_index = pos;
+ hvec[pos] = std::move(hvec[parent]);
+ hvec[pos].hnd->heap_index = pos;
pos = parent;
}
- hvec[pos].hnd_p = ohndl;
- hvec[pos].data = op;
+ hvec[pos].hnd = ohndl;
+ hvec[pos].prio = std::move(op);
ohndl->heap_index = pos;
return pos == 0;
void bubble_up(hindex_t pos = 0) noexcept
{
- P p = hvec[pos].data;
- handle_t &h = *(hvec[pos].hnd_p);
+ P p = hvec[pos].prio;
+ handle_t &h = *(hvec[pos].hnd);
bubble_up(pos, h, p);
}
hindex_t max = (rmax - 1) / N;
while (pos <= max) {
+ // Find (select) the smallest child node
hindex_t lchild = pos * N + 1;
hindex_t selchild = lchild;
hindex_t rchild = std::min(lchild + N, rmax);
for (hindex_t i = lchild + 1; i < rchild; i++) {
- if (lt(hvec[i].data, hvec[selchild].data)) {
+ if (lt(hvec[i].prio, hvec[selchild].prio)) {
selchild = i;
}
}
- if (! lt(hvec[selchild].data, p)) {
+ if (! lt(hvec[selchild].prio, p)) {
break;
}
- hvec[pos] = hvec[selchild];
- hvec[pos].hnd_p->heap_index = pos;
+ hvec[pos] = std::move(hvec[selchild]);
+ hvec[pos].hnd->heap_index = pos;
pos = selchild;
}
- hvec[pos].hnd_p = &h;
- hvec[pos].data = p;
+ hvec[pos].hnd = &h;
+ hvec[pos].prio = std::move(p);
h.heap_index = pos;
}
void remove_h(hindex_t hidx) noexcept
{
- hvec[hidx].hnd_p->heap_index = -1;
+ hvec[hidx].hnd->heap_index = -1;
if (hvec.size() != hidx + 1) {
- bubble_up(hidx, *(hvec.back().hnd_p), hvec.back().data);
+ bubble_up(hidx, *(hvec.back().hnd), hvec.back().prio);
hvec.pop_back();
}
else {
// Allocate a slot, but do not incorporate into the heap:
// u... : parameters for data constructor T::T(...)
- template <typename ...U> void allocate(handle_t & hnd, U... u)
+ template <typename ...U> void allocate(handle_t & hnd, U&&... u)
{
- new (& hnd.hd_u.hd) T(u...);
+ new (& hnd.hd_u.hd) T(std::forward<U>(u)...);
hnd.heap_index = -1;
- constexpr hindex_t max_allowed = std::numeric_limits<hindex_t>::is_signed ?
- std::numeric_limits<hindex_t>::max() : ((hindex_t) - 2);
+
+ // largest object size is PTRDIFF_MAX, so we expect the largest vector is that / sizeof node:
+ constexpr hindex_t max_allowed = (std::numeric_limits<ptrdiff_t>::max() - 1) / sizeof(heap_node);
if (num_nodes == max_allowed) {
throw std::bad_alloc();
bool insert(handle_t & hnd, const P &pval) noexcept
{
hnd.heap_index = hvec.size();
- //hvec.emplace_back(&hnd, pval);
+ // emplace an empty node; data/prio will be stored via bubble_down.
hvec.emplace_back();
return bubble_down(hvec.size() - 1, &hnd, pval);
}
// Get the root node handle. (Returns a handle_t or reference to handle_t).
- handle_t & get_root()
+ handle_t & get_root() noexcept
{
- return * hvec[0].hnd_p;
+ return * hvec[0].hnd;
}
- P &get_root_priority()
+ P &get_root_priority() noexcept
{
- return hvec[0].data;
+ return hvec[0].prio;
}
- void pull_root()
+ void pull_root() noexcept
{
remove_h(0);
}
- void remove(handle_t & hnd)
+ void remove(handle_t & hnd) noexcept
{
remove_h(hnd.heap_index);
}
- bool empty()
+ bool empty() noexcept
{
return hvec.empty();
}
- bool is_queued(handle_t & hnd)
+ bool is_queued(handle_t & hnd) noexcept
{
return hnd.heap_index != (hindex_t) -1;
}
// Set a node priority. Returns true iff the node becomes the root node (and wasn't before).
- bool set_priority(handle_t & hnd, const P& p)
+ bool set_priority(handle_t & hnd, const P& p) noexcept
{
int heap_index = hnd.heap_index;
Compare lt;
- if (lt(hvec[heap_index].data, p)) {
+ if (lt(hvec[heap_index].prio, p)) {
// Increase key
- hvec[heap_index].data = p;
+ hvec[heap_index].prio = p;
bubble_up(heap_index);
return false;
}
else {
// Decrease key
- hvec[heap_index].data = p;
+ hvec[heap_index].prio = p;
return bubble_down(heap_index);
}
}
return nullptr;
}
+ waitqueue_node<null_mutex> * get_second()
+ {
+ return nullptr;
+ }
+
bool check_head(waitqueue_node<null_mutex> &node)
{
return true;
return head;
}
+ waitqueue_node<T_Mutex> * get_second()
+ {
+ return head->next;
+ }
+
bool check_head(waitqueue_node<T_Mutex> &node)
{
return head == &node;
template <typename Loop>
static rearm process_fd_rearm(Loop &loop, typename Loop::base_fd_watcher *bfw,
- rearm rearm_type, bool is_multi_watch) noexcept
+ rearm rearm_type) noexcept
{
- return loop.process_fd_rearm(bfw, rearm_type, is_multi_watch);
+ return loop.process_fd_rearm(bfw, rearm_type);
+ }
+
+ template <typename Loop>
+ static rearm process_primary_rearm(Loop &loop, typename Loop::base_bidi_fd_watcher *bdfw,
+ rearm rearm_type) noexcept
+ {
+ return loop.process_primary_rearm(bdfw, rearm_type);
}
template <typename Loop>
{
loop.requeue_watcher(watcher);
}
+
+ template <typename Loop>
+ static void release_watcher(Loop &loop, base_watcher *watcher) noexcept
+ {
+ loop.release_watcher(watcher);
+ }
};
// Do standard post-dispatch processing for a watcher. This handles the case of removing or
- // re-queueing watchers depending on the rearm type.
+ // re-queueing watchers depending on the rearm type. This is called from the individual
+ // watcher dispatch functions to handle REMOVE or REQUEUE re-arm values.
template <typename Loop> void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearm_type)
{
if (rearm_type == rearm::REMOVE) {
loop_access::get_base_lock(loop).unlock();
+ loop_access::release_watcher(loop, watcher);
watcher->watch_removed();
loop_access::get_base_lock(loop).lock();
}
}
}
- // This class serves as the base class (mixin) for the backend mechanism.
+ // Post-dispatch handling for bidi fd watchers.
+ template <typename Loop> void post_dispatch(Loop &loop, bidi_fd_watcher<Loop> *bdfd_watcher,
+ base_watcher *out_watcher, rearm rearm_type)
+ {
+ base_watcher *watcher = (base_watcher *)bdfd_watcher;
+ if (rearm_type == rearm::REMOVE) {
+ loop_access::get_base_lock(loop).unlock();
+ loop_access::release_watcher(loop, watcher);
+ loop_access::release_watcher(loop, out_watcher);
+ watcher->watch_removed();
+ loop_access::get_base_lock(loop).lock();
+ }
+ else if (rearm_type == rearm::REQUEUE) {
+ loop_access::requeue_watcher(loop, watcher);
+ }
+ }
+
+ // The event_dispatch class serves as the base class (mixin) for the backend mechanism. It
+ // mostly manages queing and dequeing of events and maintains/owns the relevant data
+ // structures, including a mutex lock.
//
- // The event_dispatch class maintains the queued event data structures. It inserts watchers
- // into the queue when events are received (receiveXXX methods). It also owns a mutex used
- // to protect those structures.
+ // The backend mechanism should call one of the receiveXXX functions to notify of an event
+ // received. The watcher will then be queued.
//
- // In general the methods should be called with lock held. In practice this means that the
- // event loop backend implementations must obtain the lock; they are also free to use it to
- // protect their own internal data structures.
+ // In general the functions should be called with lock held. In practice this means that the
+ // event loop backend implementations (that deposit received events here) must obtain the
+ // lock; they are also free to use it to protect their own internal data structures.
template <typename Traits, typename LoopTraits> class event_dispatch
{
friend class dasynq::event_loop<typename LoopTraits::mutex_t, LoopTraits>;;
// If the watcher is active, set deleteme true; the watcher will be removed
// at the end of current processing (i.e. when active is set false).
watcher->deleteme = true;
- release_watcher(watcher);
lock.unlock();
}
else {
// So, we use two wait queues protected by a single mutex. The "attn_waitqueue"
// (attention queue) is the high-priority queue, used for threads wanting to
// unwatch event sources. The "wait_waitquueue" is the queue used by threads
- // that wish to actually poll for events.
+ // that wish to actually poll for events, while they are waiting for the main
+ // queue to become quiet.
// - The head of the "attn_waitqueue" is always the holder of the lock
// - Therefore, a poll-waiter must be moved from the wait_waitqueue to the
// attn_waitqueue to actually gain the lock. This is only done if the
// attn_waitqueue is otherwise empty.
// - The mutex only protects manipulation of the wait queues, and so should not
// be highly contended.
+ //
+ // To claim the lock for a poll-wait, the procedure is:
+ // - check if the attn_waitqueue is empty;
+ // - if it is, insert node at the head, thus claiming the lock, and return
+ // - otherwise, insert node in the wait_waitqueue, and wait
+ // To claim the lock for an unwatch, the procedure is:
+ // - insert node in the attn_waitqueue
+ // - if the node is at the head of the queue, lock is claimed; return
+ // - otherwise, if a poll is in progress, interrupt it
+ // - wait until our node is at the head of the attn_waitqueue
mutex_t wait_lock; // protects the wait/attention queues
+ bool long_poll_running = false; // whether any thread is polling the backend (with non-zero timeout)
waitqueue<mutex_t> attn_waitqueue;
waitqueue<mutex_t> wait_waitqueue;
interrupt_if_necessary();
}
+ void release_watcher(base_watcher *watcher) noexcept
+ {
+ loop_mech.release_watcher(watcher);
+ }
+
// Interrupt the current poll-waiter, if necessary - that is, if the loop is multi-thread safe, and if
// there is currently another thread polling the backend event mechanism.
void interrupt_if_necessary()
// Acquire the attention lock (when held, ensures that no thread is polling the AEN
// mechanism). This can be used to safely remove watches, since it is certain that
- // notification callbacks won't be run while the attention lock is held.
+ // notification callbacks won't be run while the attention lock is held. Any in-progress
+ // poll will be interrupted so that the lock should be acquired quickly.
void get_attn_lock(waitqueue_node<T_Mutex> &qnode) noexcept
{
std::unique_lock<T_Mutex> ulock(wait_lock);
attn_waitqueue.queue(&qnode);
if (! attn_waitqueue.check_head(qnode)) {
- loop_mech.interrupt_wait();
+ if (long_poll_running) {
+ // We want to interrupt any in-progress poll so that the attn queue will progress
+ // but we don't want to do that unnecessarily. If we are 2nd in the queue then the
+ // head must be doing the poll; interrupt it. Otherwise, we assume the 2nd has
+ // already interrupted it.
+ if (attn_waitqueue.get_second() == &qnode) {
+ loop_mech.interrupt_wait();
+ }
+ }
while (! attn_waitqueue.check_head(qnode)) {
qnode.wait(ulock);
}
}
}
+ // Acquire the attention lock, but without interrupting any poll that's in progress
+ // (prefer to fail in that case).
+ bool poll_attn_lock(waitqueue_node<T_Mutex> &qnode) noexcept
+ {
+ std::unique_lock<T_Mutex> ulock(wait_lock);
+ if (long_poll_running) {
+ // There are poll-waiters, bail out
+ return false;
+ }
+
+ // Nobody's doing a long poll, wait until we're at the head of the attn queue and return
+ // success:
+ attn_waitqueue.queue(&qnode);
+ while (! attn_waitqueue.check_head(qnode)) {
+ qnode.wait(ulock);
+ }
+
+ return true;
+ }
+
// Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower priority than
// the attention lock). The poll-wait lock is used to prevent more than a single thread from
// polling the event loop mechanism at a time; if this is not done, it is basically
while (! attn_waitqueue.check_head(qnode)) {
qnode.wait(ulock);
- }
+ }
+
+ long_poll_running = true;
}
// Release the poll-wait/attention lock.
void release_lock(waitqueue_node<T_Mutex> &qnode) noexcept
{
std::unique_lock<T_Mutex> ulock(wait_lock);
+ long_poll_running = false;
waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
if (nhead != nullptr) {
+ // Someone else now owns the lock, signal them to wake them up
nhead->signal();
}
else {
+ // Nobody is waiting in attn_waitqueue (the high-priority queue) so check in
+ // wait_waitqueue (the low-priority queue)
if (! wait_waitqueue.is_empty()) {
auto nhead = wait_waitqueue.get_head();
wait_waitqueue.unqueue();
attn_waitqueue.queue(nhead);
+ long_poll_running = true;
nhead->signal();
}
}
// Note that signal watchers cannot (currently) be disarmed
}
- // Process rearm return for fd_watcher, including the primary watcher of a bidi_fd_watcher
- rearm process_fd_rearm(base_fd_watcher * bfw, rearm rearm_type, bool is_multi_watch) noexcept
+ // Process rearm return from an fd_watcher, including the primary watcher of a bidi_fd_watcher.
+ // Depending on the rearm value, we re-arm, remove, or disarm the watcher, etc.
+ rearm process_fd_rearm(base_fd_watcher * bfw, rearm rearm_type) noexcept
{
bool emulatedfd = static_cast<base_watcher *>(bfw)->emulatefd;
+ if (emulatedfd) {
+ if (rearm_type == rearm::REARM) {
+ bfw->emulate_enabled = true;
+ rearm_type = rearm::REQUEUE;
+ }
+ else if (rearm_type == rearm::DISARM) {
+ bfw->emulate_enabled = false;
+ }
+ else if (rearm_type == rearm::NOOP) {
+ if (bfw->emulate_enabled) {
+ rearm_type = rearm::REQUEUE;
+ }
+ }
+ }
+ else if (rearm_type == rearm::REARM) {
+ set_fd_enabled_nolock(bfw, bfw->watch_fd,
+ bfw->watch_flags & (IN_EVENTS | OUT_EVENTS), true);
+ }
+ else if (rearm_type == rearm::DISARM) {
+ loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
+ }
+ else if (rearm_type == rearm::REMOVE) {
+ loop_mech.remove_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
+ }
+ return rearm_type;
+ }
+
+ // Process rearm option from the primary watcher in bidi_fd_watcher
+ rearm process_primary_rearm(base_bidi_fd_watcher * bdfw, rearm rearm_type) noexcept
+ {
+ bool emulatedfd = static_cast<base_watcher *>(bdfw)->emulatefd;
+
// Called with lock held
- if (is_multi_watch) {
- base_bidi_fd_watcher * bdfw = static_cast<base_bidi_fd_watcher *>(bfw);
+ if (rearm_type == rearm::REMOVE) {
+ bdfw->read_removed = 1;
- if (rearm_type == rearm::REMOVE) {
- bdfw->read_removed = 1;
-
- if (backend_traits_t::has_separate_rw_fd_watches) {
- bdfw->watch_flags &= ~IN_EVENTS;
- if (! emulatedfd) {
- loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
- }
- return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP;
+ if (backend_traits_t::has_separate_rw_fd_watches) {
+ bdfw->watch_flags &= ~IN_EVENTS;
+ if (! emulatedfd) {
+ loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
}
- else {
- if (! bdfw->write_removed) {
- if (bdfw->watch_flags & IN_EVENTS) {
- bdfw->watch_flags &= ~IN_EVENTS;
- if (! emulatedfd) {
- set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags,
- bdfw->watch_flags != 0);
- }
- }
- return rearm::NOOP;
- }
- else {
- // both removed: actually remove
+ return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP;
+ }
+ else {
+ if (! bdfw->write_removed) {
+ if (bdfw->watch_flags & IN_EVENTS) {
+ bdfw->watch_flags &= ~IN_EVENTS;
if (! emulatedfd) {
- loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, 0 /* not used */);
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags,
+ bdfw->watch_flags != 0);
}
- return rearm::REMOVE;
}
+ return rearm::NOOP;
}
- }
- else if (rearm_type == rearm::DISARM) {
- bdfw->watch_flags &= ~IN_EVENTS;
-
- if (! emulatedfd) {
- if (! backend_traits_t::has_separate_rw_fd_watches) {
- int watch_flags = bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS);
- set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags, watch_flags != 0);
- }
- else {
- loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
+ else {
+ // both removed: actually remove
+ if (! emulatedfd) {
+ loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, 0 /* not used */);
}
+ return rearm::REMOVE;
}
}
- else if (rearm_type == rearm::REARM) {
- if (! emulatedfd) {
- bdfw->watch_flags |= IN_EVENTS;
- if (! backend_traits_t::has_separate_rw_fd_watches) {
- int watch_flags = bdfw->watch_flags;
- set_fd_enabled_nolock(bdfw, bdfw->watch_fd,
- watch_flags & (IN_EVENTS | OUT_EVENTS), true);
- }
- else {
- set_fd_enabled_nolock(bdfw, bdfw->watch_fd, IN_EVENTS, true);
- }
+ }
+ else if (rearm_type == rearm::DISARM) {
+ bdfw->watch_flags &= ~IN_EVENTS;
+
+ if (! emulatedfd) {
+ if (! backend_traits_t::has_separate_rw_fd_watches) {
+ int watch_flags = bdfw->watch_flags & (IN_EVENTS | OUT_EVENTS);
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags, watch_flags != 0);
}
else {
- bdfw->watch_flags &= ~IN_EVENTS;
- rearm_type = rearm::REQUEUE;
+ loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
}
}
- else if (rearm_type == rearm::NOOP) {
- if (bdfw->emulatefd) {
- if (bdfw->watch_flags & IN_EVENTS) {
- bdfw->watch_flags &= ~IN_EVENTS;
- rearm_type = rearm::REQUEUE;
- }
- }
- }
- return rearm_type;
}
- else { // Not multi-watch:
- if (emulatedfd) {
- if (rearm_type == rearm::REARM) {
- bfw->emulate_enabled = true;
- rearm_type = rearm::REQUEUE;
- }
- else if (rearm_type == rearm::DISARM) {
- bfw->emulate_enabled = false;
+ else if (rearm_type == rearm::REARM) {
+ if (! emulatedfd) {
+ bdfw->watch_flags |= IN_EVENTS;
+ if (! backend_traits_t::has_separate_rw_fd_watches) {
+ int watch_flags = bdfw->watch_flags;
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd,
+ watch_flags & (IN_EVENTS | OUT_EVENTS), true);
}
- else if (rearm_type == rearm::NOOP) {
- if (bfw->emulate_enabled) {
- rearm_type = rearm::REQUEUE;
- }
+ else {
+ set_fd_enabled_nolock(bdfw, bdfw->watch_fd, IN_EVENTS, true);
}
}
- else if (rearm_type == rearm::REARM) {
- set_fd_enabled_nolock(bfw, bfw->watch_fd,
- bfw->watch_flags & (IN_EVENTS | OUT_EVENTS), true);
- }
- else if (rearm_type == rearm::DISARM) {
- loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
+ else {
+ bdfw->watch_flags &= ~IN_EVENTS;
+ rearm_type = rearm::REQUEUE;
}
- else if (rearm_type == rearm::REMOVE) {
- loop_mech.remove_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
+ }
+ else if (rearm_type == rearm::NOOP) {
+ if (bdfw->emulatefd) {
+ if (bdfw->watch_flags & IN_EVENTS) {
+ bdfw->watch_flags &= ~IN_EVENTS;
+ rearm_type = rearm::REQUEUE;
+ }
}
- return rearm_type;
}
+ return rearm_type;
}
// Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher.
void poll(int limit = -1) noexcept
{
waitqueue_node<T_Mutex> qnode;
- get_pollwait_lock(qnode);
- loop_mech.pull_events(false);
- release_lock(qnode);
+ if (poll_attn_lock(qnode)) {
+ loop_mech.pull_events(false);
+ release_lock(qnode);
+ }
process_events(limit);
}
rearm_type = rearm::REMOVE;
}
- rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, false);
+ rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type);
post_dispatch(loop, this, rearm_type);
}
rearm_type = rearm::REMOVE;
}
- rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type, true);
+ rearm_type = loop_access::process_primary_rearm(loop, this, rearm_type);
- post_dispatch(loop, this, rearm_type);
+ auto &outwatcher = bidi_fd_watcher<EventLoop>::out_watcher;
+ post_dispatch(loop, this, &outwatcher, rearm_type);
}
}
post_dispatch(loop, &outwatcher, rearm_type);
}
else {
- post_dispatch(loop, this, rearm_type);
+ post_dispatch(loop, this, &outwatcher, rearm_type);
}
}
}