--- /dev/null
+#include <vector>
+#include <utility>
+
+#include <sys/timerfd.h>
+#include <time.h>
+
+#include "dasynq-binaryheap.h"
+
+namespace dasynq {
+
+// We could use one timerfd per timer, but then we need to differentiate timer
+// descriptors from regular file descriptors when events are reported by the loop
+// mechanism so that we can correctly report a timer event or fd event.
+
+// With a file descriptor or signal, we can use the item itself as the identifier for
+// adding/removing watches. For timers, it's more complicated. When we add a timer,
+// we are given a handle; we need to use this to modify the watch. We delegate the
+// process of allocating a handle to a priority heap implementation (BinaryHeap).
+
+
+class TimerData
+{
+ public:
+ // initial time?
+ struct timespec interval_time; // interval (if 0, one-off timer)
+ int expiry_count; // number of times expired
+ bool enabled; // whether timer reports events
+ void *userdata;
+
+ TimerData(void *udata = nullptr) : interval_time({0,0}), expiry_count(0), enabled(true), userdata(udata)
+ {
+ // constructor
+ }
+};
+
+class CompareTimespec
+{
+ public:
+ bool operator()(const struct timespec &a, const struct timespec &b)
+ {
+ if (a.tv_sec < b.tv_sec) {
+ return true;
+ }
+
+ if (a.tv_sec == b.tv_sec) {
+ return a.tv_nsec < b.tv_nsec;
+ }
+
+ return false;
+ }
+};
+
+using timer_handle_t = BinaryHeap<TimerData, struct timespec, CompareTimespec>::handle_t;
+
+static void init_timer_handle(timer_handle_t &hnd) noexcept
+{
+ BinaryHeap<TimerData, struct timespec, CompareTimespec>::init_handle(hnd);
+}
+
+
+template <class Base> class TimerFdEvents : public Base
+{
+ private:
+ int timerfd_fd = -1;
+
+ BinaryHeap<TimerData, struct timespec, CompareTimespec> timer_queue;
+
+
+ static int divide_timespec(const struct timespec &num, const struct timespec &den)
+ {
+ // TODO
+ return 0;
+ }
+
+ // Set the timerfd timeout to match the first timer in the queue (disable the timerfd
+ // if there are no active timers).
+ void set_timer_from_queue()
+ {
+ struct itimerspec newtime;
+ if (timer_queue.empty()) {
+ newtime.it_value = {0, 0};
+ newtime.it_interval = {0, 0};
+ }
+ else {
+ newtime.it_value = timer_queue.get_root_priority();
+ newtime.it_interval = {0, 0};
+ }
+ timerfd_settime(timerfd_fd, TFD_TIMER_ABSTIME, &newtime, nullptr);
+ }
+
+ public:
+ template <typename T>
+ void receiveFdEvent(T &loop_mech, typename Base::FD_r fd_r, void * userdata, int flags)
+ {
+ if (userdata == &timerfd_fd) {
+ struct timespec curtime;
+ clock_gettime(CLOCK_MONOTONIC, &curtime); // in theory, can't fail on Linux
+
+ // Peek timer queue; calculate difference between current time and timeout
+ struct timespec * timeout = &timer_queue.get_root_priority();
+ while (timeout->tv_sec < curtime.tv_sec || (timeout->tv_sec == curtime.tv_sec &&
+ timeout->tv_nsec <= curtime.tv_nsec)) {
+ // Increment expiry count
+ timer_queue.node_data(timer_queue.get_root()).expiry_count++;
+ // (a periodic timer may have overrun; calculated below).
+
+ auto thandle = timer_queue.get_root();
+ TimerData &data = timer_queue.node_data(thandle);
+ timespec &interval = data.interval_time;
+ if (interval.tv_sec == 0 && interval.tv_nsec == 0) {
+ // Non periodic timer
+ timer_queue.pull_root();
+ if (data.enabled) {
+ int expiry_count = data.expiry_count;
+ data.expiry_count = 0;
+ Base::receiveTimerExpiry(thandle, data.userdata, expiry_count);
+ }
+ if (timer_queue.empty()) {
+ break;
+ }
+ }
+ else {
+ // Periodic timer TODO
+ // First calculate the overrun in time:
+ /*
+ struct timespec diff;
+ diff.tv_sec = curtime.tv_sec - timeout->tv_sec;
+ diff.tv_nsec = curtime.tv_nsec - timeout->tv_nsec;
+ if (diff.tv_nsec < 0) {
+ diff.tv_nsec += 1000000000;
+ diff.tv_sec--;
+ }
+ */
+ // Now we have to divide the time overrun by the period to find the
+ // interval overrun. This requires a division of a value not representable
+ // as a long...
+ // TODO use divide_timespec
+ // TODO better not to remove from queue maybe, but instead mark as inactive,
+ // adjust timeout, and bubble into correct position
+ // call Base::receieveTimerEvent
+ // TODO
+ }
+
+ // repeat until all expired timeouts processed
+ // timeout = &timer_queue[0].timeout;
+ // (shouldn't be necessary; address hasn't changed...)
+ }
+ // arm timerfd with timeout from head of queue
+ set_timer_from_queue();
+ }
+ else {
+ Base::receiveFdEvent(loop_mech, fd_r, userdata, flags);
+ }
+ }
+
+ template <typename T> void init(T *loop_mech)
+ {
+ timerfd_fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
+ if (timerfd_fd == -1) {
+ throw std::system_error(errno, std::system_category());
+ }
+ loop_mech->addFdWatch(timerfd_fd, &timerfd_fd, IN_EVENTS);
+ Base::init(loop_mech);
+ }
+
+ // Add timer, return handle (TODO: clock id param?)
+ void addTimer(timer_handle_t &h, void *userdata)
+ {
+ timer_queue.allocate(h, userdata);
+ }
+
+ void removeTimer(timer_handle_t &timer_id) noexcept
+ {
+ removeTimer_nolock(timer_id);
+ }
+
+ void removeTimer_nolock(timer_handle_t &timer_id) noexcept
+ {
+ if (timer_queue.is_queued(timer_id)) {
+ timer_queue.remove(timer_id);
+ }
+ timer_queue.deallocate(timer_id);
+ }
+
+ // starts (if not started) a timer to timeout at the given time. Resets the expiry count to 0.
+ // enable: specifies whether to enable reporting of timeouts/intervals
+ void setTimer(timer_handle_t & timer_id, struct timespec &timeout, struct timespec &interval, bool enable) noexcept
+ {
+ auto &ts = timer_queue.node_data(timer_id);
+ ts.interval_time = interval;
+ ts.expiry_count = 0;
+ ts.enabled = enable;
+
+ if (timer_queue.is_queued(timer_id)) {
+ // Already queued; alter timeout
+ if (timer_queue.set_priority(timer_id, timeout)) {
+ set_timer_from_queue();
+ }
+ }
+ else {
+ if (timer_queue.insert(timer_id, timeout)) {
+ set_timer_from_queue();
+ }
+ }
+
+ // TODO locking (here and everywhere)
+ }
+
+ // Set timer relative to current time:
+ void setTimerRel(timer_handle_t & timer_id, struct timespec &timeout, struct timespec &interval, bool enable) noexcept
+ {
+ // TODO consider caching current time somehow; need to decide then when to update cached value.
+ struct timespec curtime;
+ clock_gettime(CLOCK_MONOTONIC, &curtime);
+ curtime.tv_sec += timeout.tv_sec;
+ curtime.tv_nsec += timeout.tv_nsec;
+ if (curtime.tv_nsec > 1000000000) {
+ curtime.tv_nsec -= 1000000000;
+ curtime.tv_sec++;
+ }
+ setTimer(timer_id, curtime, interval, enable);
+ }
+
+ // Enables or disabling report of timeouts (does not stop timer)
+ void enableTimer(timer_handle_t & timer_id, bool enable) noexcept
+ {
+ enableTimer_nolock(timer_id, enable);
+ }
+
+ void enableTimer_nolock(timer_handle_t & timer_id, bool enable) noexcept
+ {
+ timer_queue.node_data(timer_id).enabled = enable;
+ }
+};
+
+}
-#ifndef DASYNC_H_INCLUDED
-#define DASYNC_H_INCLUDED
+#ifndef DASYNQ_H_INCLUDED
+#define DASYNQ_H_INCLUDED
#if defined(__OpenBSD__)
-#define HAVE_KQUEUE 1
+#define DASYNQ_HAVE_KQUEUE 1
#endif
#if defined(__linux__)
-#define HAVE_EPOLL 1
+#define DASYNQ_HAVE_EPOLL 1
#endif
#include "dasynq-flags.h"
+#include "dasynq-binaryheap.h"
-#if defined(HAVE_KQUEUE)
+#if defined(DASYNQ_CUSTOM_LOOP_IMPLEMENTATION)
+// Loop and LoopTraits defined already; used for testing
+#elif defined(DASYNQ_HAVE_KQUEUE)
#include "dasynq-kqueue.h"
+#include "dasynq-itimer.h"
#include "dasynq-childproc.h"
namespace dasynq {
- template <typename T> using Loop = KqueueLoop<T>;
+ template <typename T> using Loop = KqueueLoop<ITimerEvents<ChildProcEvents<T>>>;
using LoopTraits = KqueueTraits;
}
-#elif defined(HAVE_EPOLL)
+#elif defined(DASYNQ_HAVE_EPOLL)
#include "dasynq-epoll.h"
+#include "dasynq-timerfd.h"
#include "dasynq-childproc.h"
namespace dasynq {
- template <typename T> using Loop = EpollLoop<T>;
+ template <typename T> using Loop = EpollLoop<TimerFdEvents<ChildProcEvents<T>>>;
using LoopTraits = EpollTraits;
}
#endif
+
#include <atomic>
#include <condition_variable>
#include <cstdint>
#ifdef __GNUC__
#ifndef __clang__
-#define EMPTY_BODY char empty[0]; // Make class instances take up no space (gcc)
+#define DASYNQ_EMPTY_BODY char empty[0]; // Make class instances take up no space (gcc)
#else
-#define EMPTY_BODY char empty[0] __attribute__((unused)); // Make class instances take up no space (clang)
+#define DASYNQ_EMPTY_BODY char empty[0] __attribute__((unused)); // Make class instances take up no space (clang)
#endif
#endif
namespace dasynq {
+namespace dprivate {
+ class BaseWatcher;
+}
+
+using PrioQueue = BinaryHeap<dprivate::BaseWatcher *, int>;
+
+inline namespace {
+ constexpr int DEFAULT_PRIORITY = 50;
+}
/**
* Values for rearm/disarm return from event handlers
*/
-enum class Rearm
+enum class rearm
{
/** Re-arm the event watcher so that it receives further events */
REARM,
// TODO: add a REQUEUE option, which means, "I didn't complete input/output, run me again soon"
};
+// Different timer clock types
+enum class ClockType
+{
+ WALLTIME,
+ MONOTONIC
+};
+
// Information about a received signal.
// This is essentially a wrapper for the POSIX siginfo_t; its existence allows for mechanisms that receive
// equivalent signal information in a different format (eg signalfd on Linux).
using SigInfo = LoopTraits::SigInfo;
// Forward declarations:
-template <typename T_Mutex> class EventLoop;
+template <typename T_Mutex> class event_loop;
namespace dprivate {
// (non-public API)
- template <typename T_Mutex> class FdWatcher;
- template <typename T_Mutex> class BidiFdWatcher;
- template <typename T_Mutex> class SignalWatcher;
- template <typename T_Mutex> class ChildProcWatcher;
+ template <typename T_Loop> class FdWatcher;
+ template <typename T_Loop> class BidiFdWatcher;
+ template <typename T_Loop> class SignalWatcher;
+ template <typename T_Loop> class ChildProcWatcher;
+ template <typename T_Loop> class Timer;
enum class WatchType
{
SIGNAL,
FD,
CHILD,
- SECONDARYFD
+ SECONDARYFD,
+ TIMER
};
template <typename T_Mutex, typename Traits> class EventDispatch;
class BaseWatcher
{
template <typename T_Mutex, typename Traits> friend class EventDispatch;
- template <typename T_Mutex> friend class dasynq::EventLoop;
+ template <typename T_Mutex> friend class dasynq::event_loop;
protected:
WatchType watchType;
- int active : 1;
- int deleteme : 1;
+ int active : 1; // currently executing handler?
+ int deleteme : 1; // delete when handler finished?
- BaseWatcher * prev;
- BaseWatcher * next;
+ PrioQueue::handle_t heap_handle;
+ int priority;
+
+ static void set_priority(BaseWatcher &p, int prio)
+ {
+ p.priority = prio;
+ }
public:
{
active = false;
deleteme = false;
- prev = nullptr;
- next = nullptr;
+ PrioQueue::init_handle(heap_handle);
+ priority = DEFAULT_PRIORITY;
}
BaseWatcher(WatchType wt) noexcept : watchType(wt) { }
class BaseSignalWatcher : public BaseWatcher
{
template <typename M, typename Traits> friend class EventDispatch;
- friend class dasynq::EventLoop<T_Mutex>;
+ friend class dasynq::event_loop<T_Mutex>;
protected:
SigInfo siginfo;
public:
typedef SigInfo &SigInfo_p;
- virtual Rearm received(EventLoop<T_Mutex> &eloop, int signo, SigInfo_p siginfo) = 0;
+ virtual rearm received(event_loop<T_Mutex> &eloop, int signo, SigInfo_p siginfo) = 0;
};
template <typename T_Mutex>
class BaseFdWatcher : public BaseWatcher
{
template <typename, typename Traits> friend class EventDispatch;
- friend class dasynq::EventLoop<T_Mutex>;
+ friend class dasynq::event_loop<T_Mutex>;
protected:
int watch_fd;
BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }
public:
- virtual Rearm fdEvent(EventLoop<T_Mutex> &eloop, int fd, int flags) = 0;
+ virtual rearm fdEvent(event_loop<T_Mutex> &eloop, int fd, int flags) = 0;
};
template <typename T_Mutex>
class BaseBidiFdWatcher : public BaseFdWatcher<T_Mutex>
{
template <typename, typename Traits> friend class EventDispatch;
- friend class dasynq::EventLoop<T_Mutex>;
+ friend class dasynq::event_loop<T_Mutex>;
// This should never actually get called:
- Rearm fdEvent(EventLoop<T_Mutex> &eloop, int fd, int flags) final
+ rearm fdEvent(event_loop<T_Mutex> &eloop, int fd, int flags) final
{
- return Rearm::REARM; // should not be reachable.
+ return rearm::REARM; // should not be reachable.
};
protected:
int write_removed : 1; // write watch removed?
public:
- virtual Rearm readReady(EventLoop<T_Mutex> &eloop, int fd) noexcept = 0;
- virtual Rearm writeReady(EventLoop<T_Mutex> &eloop, int fd) noexcept = 0;
+ virtual rearm readReady(event_loop<T_Mutex> &eloop, int fd) noexcept = 0;
+ virtual rearm writeReady(event_loop<T_Mutex> &eloop, int fd) noexcept = 0;
};
template <typename T_Mutex>
class BaseChildWatcher : public BaseWatcher
{
template <typename, typename Traits> friend class EventDispatch;
- friend class dasynq::EventLoop<T_Mutex>;
+ friend class dasynq::event_loop<T_Mutex>;
protected:
pid_t watch_pid;
BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { }
public:
- virtual Rearm childStatus(EventLoop<T_Mutex> &eloop, pid_t child, int status) = 0;
+ virtual rearm childStatus(event_loop<T_Mutex> &eloop, pid_t child, int status) = 0;
+ };
+
+
+ template <typename T_Mutex>
+ class BaseTimerWatcher : public BaseWatcher
+ {
+ template <typename, typename Traits> friend class EventDispatch;
+ friend class dasynq::event_loop<T_Mutex>;
+
+ protected:
+ timer_handle_t timer_handle;
+ int intervals;
+
+ BaseTimerWatcher() : BaseWatcher(WatchType::TIMER)
+ {
+ init_timer_handle(timer_handle);
+ }
+
+ public:
+ // Timer expired, and the given number of intervals have elapsed before
+ // expiry evenet was queued. Normally intervals == 1 to indicate no
+ // overrun.
+ virtual rearm timerExpiry(event_loop<T_Mutex> &eloop, int intervals) = 0;
};
// Classes for implementing a fair(ish) wait queue.
template <> class waitqueue_node<NullMutex>
{
// Specialised waitqueue_node for NullMutex.
- // TODO can this be reduced to 0 data members?
friend class waitqueue<NullMutex>;
- waitqueue_node * next = nullptr;
public:
void wait(std::unique_lock<NullMutex> &ul) { }
void signal() { }
+
+ DASYNQ_EMPTY_BODY;
};
template <typename T_Mutex> class waitqueue_node
}
};
+ template <> class waitqueue<NullMutex>
+ {
+ public:
+ waitqueue_node<NullMutex> * unqueue()
+ {
+ return nullptr;
+ }
+
+ waitqueue_node<NullMutex> * getHead()
+ {
+ return nullptr;
+ }
+
+ bool checkHead(waitqueue_node<NullMutex> &node)
+ {
+ return true;
+ }
+
+ bool isEmpty()
+ {
+ return true;
+ }
+
+ void queue(waitqueue_node<NullMutex> *node)
+ {
+ }
+ };
+
template <typename T_Mutex> class waitqueue
{
waitqueue_node<T_Mutex> * tail = nullptr;
return head;
}
+ bool checkHead(waitqueue_node<T_Mutex> &node)
+ {
+ return head == &node;
+ }
+
+ bool isEmpty()
+ {
+ return head == nullptr;
+ }
+
void queue(waitqueue_node<T_Mutex> *node)
{
if (tail) {
}
}
};
-
+
// This class serves as the base class (mixin) for the AEN mechanism class.
//
// The EventDispatch class maintains the queued event data structures. It inserts watchers
// into the queue when eventes are received (receiveXXX methods).
template <typename T_Mutex, typename Traits> class EventDispatch : public Traits
{
- friend class EventLoop<T_Mutex>;
+ friend class event_loop<T_Mutex>;
// queue data structure/pointer
- BaseWatcher * first = nullptr;
+ PrioQueue event_queue;
using BaseSignalWatcher = dasynq::dprivate::BaseSignalWatcher<T_Mutex>;
using BaseFdWatcher = dasynq::dprivate::BaseFdWatcher<T_Mutex>;
using BaseBidiFdWatcher = dasynq::dprivate::BaseBidiFdWatcher<T_Mutex>;
using BaseChildWatcher = dasynq::dprivate::BaseChildWatcher<T_Mutex>;
+ using BaseTimerWatcher = dasynq::dprivate::BaseTimerWatcher<T_Mutex>;
- void queueWatcher(BaseWatcher *bwatcher)
+ // Add a watcher into the queuing system (but don't queue it)
+ // may throw: std::bad_alloc
+ void prepare_watcher(BaseWatcher *bwatcher)
{
- // Put in queue:
- if (first == nullptr) {
- bwatcher->prev = bwatcher;
- bwatcher->next = bwatcher;
- first = bwatcher;
- }
- else {
- first->prev->next = bwatcher;
- bwatcher->prev = first->prev;
- first->prev = bwatcher;
- bwatcher->next = first;
- }
+ event_queue.allocate(bwatcher->heap_handle, bwatcher);
+ }
+
+ void queueWatcher(BaseWatcher *bwatcher) noexcept
+ {
+ event_queue.insert(bwatcher->heap_handle, bwatcher->priority);
}
bool isQueued(BaseWatcher *bwatcher)
{
- return bwatcher->prev != nullptr;
+ return event_queue.is_queued(bwatcher->heap_handle);
}
void dequeueWatcher(BaseWatcher *bwatcher)
{
- if (bwatcher->prev == bwatcher) {
- // Only item in queue
- first = nullptr;
+ if (event_queue.is_queued(bwatcher->heap_handle)) {
+ event_queue.remove(bwatcher->heap_handle);
}
- else {
- if (first == bwatcher) first = first->next;
- bwatcher->prev->next = bwatcher->next;
- bwatcher->next->prev = bwatcher->prev;
- }
-
- bwatcher->prev = nullptr;
- bwatcher->next = nullptr;
+ }
+
+ // Remove watcher from the queueing system
+ void release_watcher(BaseWatcher *bwatcher)
+ {
+ event_queue.deallocate(bwatcher->heap_handle);
}
protected:
T_Mutex lock;
- void receiveSignal(typename Traits::SigInfo & siginfo, void * userdata)
+ // Receive a signal; return true to disable signal watch or false to leave enabled
+ template <typename T>
+ bool receiveSignal(T &loop_mech, typename Traits::SigInfo & siginfo, void * userdata)
{
BaseSignalWatcher * bwatcher = static_cast<BaseSignalWatcher *>(userdata);
bwatcher->siginfo = siginfo;
queueWatcher(bwatcher);
+ return true;
}
template <typename T>
queueWatcher(watcher);
}
+ void receiveTimerExpiry(timer_handle_t & timer_handle, void * userdata, int intervals)
+ {
+ BaseTimerWatcher * watcher = static_cast<BaseTimerWatcher *>(userdata);
+ watcher->intervals = intervals;
+ queueWatcher(watcher);
+ }
+
// Pull a single event from the queue
BaseWatcher * pullEvent()
{
- BaseWatcher * r = first;
- if (r != nullptr) {
- dequeueWatcher(r);
+ if (event_queue.empty()) {
+ return nullptr;
}
+
+ auto rhndl = event_queue.get_root();
+ BaseWatcher *r = event_queue.node_data(rhndl);
+ event_queue.pull_root();
return r;
}
// If the watcher is active, set deleteme true; the watcher will be removed
// at the end of current processing (i.e. when active is set false).
watcher->deleteme = true;
+ release_watcher(watcher);
lock.unlock();
}
else {
// Actually do the delete.
- if (isQueued(watcher)) {
- dequeueWatcher(watcher);
- }
+ dequeueWatcher(watcher);
+ release_watcher(watcher);
lock.unlock();
watcher->watchRemoved();
if (watcher->active) {
watcher->deleteme = true;
+ release_watcher(watcher);
}
else {
- if (isQueued(watcher)) {
- dequeueWatcher(watcher);
- }
-
+ dequeueWatcher(watcher);
+ release_watcher(watcher);
watcher->read_removed = true;
}
BaseWatcher *secondary = &(watcher->outWatcher);
if (secondary->active) {
secondary->deleteme = true;
+ release_watcher(watcher);
}
else {
- if (isQueued(secondary)) {
- dequeueWatcher(secondary);
- }
-
+ dequeueWatcher(secondary);
+ release_watcher(watcher);
watcher->write_removed = true;
}
}
-template <typename T_Mutex> class EventLoop
+template <typename T_Mutex> class event_loop
{
- friend class dprivate::FdWatcher<EventLoop<T_Mutex>>;
- friend class dprivate::BidiFdWatcher<EventLoop<T_Mutex>>;
- friend class dprivate::SignalWatcher<EventLoop<T_Mutex>>;
- friend class dprivate::ChildProcWatcher<EventLoop<T_Mutex>>;
+ friend class dprivate::FdWatcher<event_loop<T_Mutex>>;
+ friend class dprivate::BidiFdWatcher<event_loop<T_Mutex>>;
+ friend class dprivate::SignalWatcher<event_loop<T_Mutex>>;
+ friend class dprivate::ChildProcWatcher<event_loop<T_Mutex>>;
+ friend class dprivate::Timer<event_loop<T_Mutex>>;
public:
using LoopTraits = dasynq::LoopTraits;
using BaseFdWatcher = dprivate::BaseFdWatcher<T_Mutex>;
using BaseBidiFdWatcher = dprivate::BaseBidiFdWatcher<T_Mutex>;
using BaseChildWatcher = dprivate::BaseChildWatcher<T_Mutex>;
+ using BaseTimerWatcher = dprivate::BaseTimerWatcher<T_Mutex>;
using WatchType = dprivate::WatchType;
- Loop<ChildProcEvents<EventDispatch<T_Mutex, LoopTraits>>> loop_mech;
+ Loop<EventDispatch<T_Mutex, LoopTraits>> loop_mech;
// There is a complex problem with most asynchronous event notification mechanisms
// when used in a multi-threaded environment. Generally, a file descriptor or other
void registerSignal(BaseSignalWatcher *callBack, int signo)
{
- loop_mech.addSignalWatch(signo, callBack);
+ loop_mech.prepare_watcher(callBack);
+ try {
+ loop_mech.addSignalWatch(signo, callBack);
+ }
+ catch (...) {
+ loop_mech.release_watcher(callBack);
+ throw;
+ }
}
void deregister(BaseSignalWatcher *callBack, int signo) noexcept
void registerFd(BaseFdWatcher *callback, int fd, int eventmask, bool enabled)
{
- loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled);
+ loop_mech.prepare_watcher(callback);
+ try {
+ loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT, enabled);
+ }
+ catch (...) {
+ loop_mech.release_watcher(callback);
+ throw;
+ }
}
void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask)
{
- if (LoopTraits::has_separate_rw_fd_watches) {
- loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT);
+ loop_mech.prepare_watcher(callback);
+ try {
+ loop_mech.prepare_watcher(&callback->outWatcher);
+ try {
+ if (LoopTraits::has_separate_rw_fd_watches) {
+ loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT);
+ }
+ else {
+ loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT);
+ }
+ }
+ catch (...) {
+ loop_mech.release_watcher(&callback->outWatcher);
+ throw;
+ }
}
- else {
- loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT);
+ catch (...) {
+ loop_mech.release_watcher(callback);
+ throw;
}
}
releaseLock(qnode);
}
- void reserveChildWatch(BaseChildWatcher *callBack)
+ void reserveChildWatch(BaseChildWatcher *callback)
{
- loop_mech.reserveChildWatch();
+ loop_mech.prepare_watcher(callback);
+ try {
+ loop_mech.reserveChildWatch();
+ }
+ catch (...) {
+ loop_mech.release_watcher(callback);
+ throw;
+ }
}
- void unreserve(BaseChildWatcher *callBack)
+ void unreserve(BaseChildWatcher *callback)
{
loop_mech.unreserveChildWatch();
+ loop_mech.release_watcher(callback);
}
- void registerChild(BaseChildWatcher *callBack, pid_t child)
+ void registerChild(BaseChildWatcher *callback, pid_t child)
{
- loop_mech.addChildWatch(child, callBack);
+ loop_mech.prepare_watcher(callback);
+ try {
+ loop_mech.addChildWatch(child, callback);
+ }
+ catch (...) {
+ loop_mech.release_watcher(callback);
+ throw;
+ }
}
void registerReservedChild(BaseChildWatcher *callBack, pid_t child) noexcept
releaseLock(qnode);
}
- void dequeueWatcher(BaseWatcher *watcher) noexcept
+ void registerTimer(BaseTimerWatcher *callback)
{
- if (loop_mech.isQueued(watcher)) {
- loop_mech.dequeueWatcher(watcher);
+ loop_mech.prepare_watcher(callback);
+ try {
+ loop_mech.addTimer(callback->timer_handle, callback);
}
+ catch (...) {
+ loop_mech.release_watcher(callback);
+ }
+ }
+
+ void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout)
+ {
+ struct timespec interval {0, 0};
+ loop_mech.setTimer(callBack->timer_handle, timeout, interval, true);
+ }
+
+ void setTimer(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval)
+ {
+ loop_mech.setTimer(callBack->timer_handle, timeout, interval, true);
+ }
+
+ void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout)
+ {
+ struct timespec interval {0, 0};
+ loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true);
+ }
+
+ void setTimerRel(BaseTimerWatcher *callBack, struct timespec &timeout, struct timespec &interval)
+ {
+ loop_mech.setTimerRel(callBack->timer_handle, timeout, interval, true);
+ }
+
+ void deregister(BaseTimerWatcher *callback)
+ {
+ loop_mech.removeTimer(callback->timer_handle);
+
+ waitqueue_node<T_Mutex> qnode;
+ getAttnLock(qnode);
+
+ EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+ ed.issueDelete(callback);
+
+ releaseLock(qnode);
+ }
+
+ void dequeueWatcher(BaseWatcher *watcher) noexcept
+ {
+ loop_mech.dequeueWatcher(watcher);
}
// Acquire the attention lock (when held, ensures that no thread is polling the AEN
{
std::unique_lock<T_Mutex> ulock(wait_lock);
attn_waitqueue.queue(&qnode);
- if (attn_waitqueue.getHead() != &qnode) {
+ if (! attn_waitqueue.checkHead(qnode)) {
loop_mech.interruptWait();
- while (attn_waitqueue.getHead() != &qnode) {
+ while (! attn_waitqueue.checkHead(qnode)) {
qnode.wait(ulock);
}
}
void getPollwaitLock(waitqueue_node<T_Mutex> &qnode)
{
std::unique_lock<T_Mutex> ulock(wait_lock);
- if (attn_waitqueue.getHead() == nullptr) {
+ if (attn_waitqueue.isEmpty()) {
// Queue is completely empty:
attn_waitqueue.queue(&qnode);
}
wait_waitqueue.queue(&qnode);
}
- while (attn_waitqueue.getHead() != &qnode) {
+ while (! attn_waitqueue.checkHead(qnode)) {
qnode.wait(ulock);
}
}
}
}
- void processSignalRearm(BaseSignalWatcher * bsw, Rearm rearmType)
+ void processSignalRearm(BaseSignalWatcher * bsw, rearm rearmType)
{
// Called with lock held
- if (rearmType == Rearm::REARM) {
+ if (rearmType == rearm::REARM) {
loop_mech.rearmSignalWatch_nolock(bsw->siginfo.get_signo());
}
- else if (rearmType == Rearm::REMOVE) {
+ else if (rearmType == rearm::REMOVE) {
loop_mech.removeSignalWatch_nolock(bsw->siginfo.get_signo());
}
}
- Rearm processFdRearm(BaseFdWatcher * bfw, Rearm rearmType, bool is_multi_watch)
+ rearm processFdRearm(BaseFdWatcher * bfw, rearm rearmType, bool is_multi_watch)
{
// Called with lock held;
// bdfw->event_flags contains only with pending (queued) events
if (is_multi_watch) {
BaseBidiFdWatcher * bdfw = static_cast<BaseBidiFdWatcher *>(bfw);
- if (rearmType == Rearm::REMOVE) {
+ if (rearmType == rearm::REMOVE) {
bdfw->read_removed = 1;
bdfw->watch_flags &= ~IN_EVENTS;
if (! LoopTraits::has_separate_rw_fd_watches) {
if (! bdfw->write_removed) {
- return Rearm::NOOP;
+ return rearm::NOOP;
}
else {
// both removed: actually remove
loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
- return Rearm::REMOVE;
+ return rearm::REMOVE;
}
}
else {
loop_mech.removeFdWatch_nolock(bdfw->watch_fd, IN_EVENTS);
}
}
- else if (rearmType == Rearm::DISARM) {
+ else if (rearmType == rearm::DISARM) {
// Nothing more to do
}
- else if (rearmType == Rearm::REARM) {
+ else if (rearmType == rearm::REARM) {
bdfw->watch_flags |= IN_EVENTS;
if (! LoopTraits::has_separate_rw_fd_watches) {
}
return rearmType;
}
- else {
- if (rearmType == Rearm::REARM) {
+ else { // Not multi-watch:
+ if (rearmType == rearm::REARM) {
loop_mech.enableFdWatch_nolock(bfw->watch_fd, bfw,
(bfw->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT);
}
- else if (rearmType == Rearm::REMOVE) {
+ else if (rearmType == rearm::REMOVE) {
loop_mech.removeFdWatch_nolock(bfw->watch_fd, bfw->watch_flags);
}
return rearmType;
}
// Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher.
- Rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, Rearm rearmType)
+ rearm processSecondaryRearm(BaseBidiFdWatcher * bdfw, rearm rearmType)
{
// Called with lock held
- if (rearmType == Rearm::REMOVE) {
+ if (rearmType == rearm::REMOVE) {
bdfw->write_removed = 1;
bdfw->watch_flags &= ~OUT_EVENTS;
if (LoopTraits::has_separate_rw_fd_watches) {
loop_mech.removeFdWatch_nolock(bdfw->watch_fd, OUT_EVENTS);
- return bdfw->read_removed ? Rearm::REMOVE : Rearm::NOOP;
+ return bdfw->read_removed ? rearm::REMOVE : rearm::NOOP;
}
else {
if (! bdfw->read_removed) {
- return Rearm::NOOP;
+ return rearm::NOOP;
}
else {
// both removed: actually remove
loop_mech.removeFdWatch_nolock(bdfw->watch_fd, 0 /* not used */);
- return Rearm::REMOVE;
+ return rearm::REMOVE;
}
}
}
- else if (rearmType == Rearm::DISARM) {
+ else if (rearmType == rearm::DISARM) {
// Nothing more to do
}
- else if (rearmType == Rearm::REARM) {
+ else if (rearmType == rearm::REARM) {
bdfw->watch_flags |= OUT_EVENTS;
if (! LoopTraits::has_separate_rw_fd_watches) {
}
return rearmType;
}
+
+ void processTimerRearm(BaseTimerWatcher *btw, rearm rearmType)
+ {
+ // Called with lock held
+ if (rearmType == rearm::REARM) {
+ loop_mech.enableTimer_nolock(btw->timer_handle, true);
+ }
+ else if (rearmType == rearm::REMOVE) {
+ loop_mech.removeTimer_nolock(btw->timer_handle);
+ }
+ }
bool processEvents() noexcept
{
pqueue->active = true;
active = true;
- Rearm rearmType = Rearm::NOOP;
+ rearm rearmType = rearm::NOOP;
bool is_multi_watch = false;
BaseBidiFdWatcher *bbfw = nullptr;
bbfw->event_flags &= ~OUT_EVENTS;
break;
}
+ case WatchType::TIMER: {
+ BaseTimerWatcher *btw = static_cast<BaseTimerWatcher *>(pqueue);
+ rearmType = btw->timerExpiry(*this, btw->intervals);
+ break;
+ }
default: ;
}
ed.lock.lock();
// (if REMOVED, we must not touch pqueue at all)
- if (rearmType != Rearm::REMOVED) {
+ if (rearmType != rearm::REMOVED) {
pqueue->active = false;
if (pqueue->deleteme) {
// We don't want a watch that is marked "deleteme" to re-arm itself.
- rearmType = Rearm::REMOVE;
+ rearmType = rearm::REMOVE;
}
switch (pqueue->watchType) {
case WatchType::SIGNAL:
case WatchType::SECONDARYFD:
rearmType = processSecondaryRearm(bbfw, rearmType);
break;
+ case WatchType::TIMER:
+ processTimerRearm(static_cast<BaseTimerWatcher *>(pqueue), rearmType);
+ break;
default: ;
}
- if (pqueue->deleteme || rearmType == Rearm::REMOVE) {
+ if (rearmType == rearm::REMOVE) {
ed.lock.unlock();
+ // Note that for BidiFd watches, watchRemoved is only called on the primary watch.
+ // The process function called above only returns Rearm::REMOVE if both primary and
+ // secondary watches have been removed.
(is_multi_watch ? bbfw : pqueue)->watchRemoved();
ed.lock.lock();
}
public:
using mutex_t = T_Mutex;
- using FdWatcher = dprivate::FdWatcher<EventLoop<T_Mutex>>;
- using BidiFdWatcher = dprivate::BidiFdWatcher<EventLoop<T_Mutex>>;
- using SignalWatcher = dprivate::SignalWatcher<EventLoop<T_Mutex>>;
- using ChildProcWatcher = dprivate::ChildProcWatcher<EventLoop<T_Mutex>>;
+ using FdWatcher = dprivate::FdWatcher<event_loop<T_Mutex>>;
+ using BidiFdWatcher = dprivate::BidiFdWatcher<event_loop<T_Mutex>>;
+ using SignalWatcher = dprivate::SignalWatcher<event_loop<T_Mutex>>;
+ using ChildProcWatcher = dprivate::ChildProcWatcher<event_loop<T_Mutex>>;
+ using Timer = dprivate::Timer<event_loop<T_Mutex>>;
// using LoopTraits = dasynq::LoopTraits;
}
};
-typedef EventLoop<NullMutex> NEventLoop;
-typedef EventLoop<std::mutex> TEventLoop;
+typedef event_loop<NullMutex> NEventLoop;
+typedef event_loop<std::mutex> TEventLoop;
// from dasync.cc:
TEventLoop & getSystemLoop();
// If an attempt is made to register with more than one event loop at
// a time, behaviour is undefined. The signal should be masked before
// call.
- inline void addWatch(EventLoop &eloop, int signo)
+ inline void addWatch(EventLoop &eloop, int signo, int prio = DEFAULT_PRIORITY)
{
BaseWatcher::init();
+ this->priority = prio;
this->siginfo.set_signo(signo);
eloop.registerSignal(this, signo);
}
eloop.deregister(this, this->siginfo.get_signo());
}
- // virtual Rearm received(EventLoop &, int signo, SigInfo_p info) = 0;
+ // virtual rearm received(EventLoop &, int signo, SigInfo_p info) = 0;
};
// Posix file descriptor event watcher
// causes undefined behavior.
//
// Can fail with std::bad_alloc or std::system_error.
- void addWatch(EventLoop &eloop, int fd, int flags, bool enabled = true)
+ void addWatch(EventLoop &eloop, int fd, int flags, bool enabled = true, int prio = DEFAULT_PRIORITY)
{
BaseWatcher::init();
+ this->priority = prio;
this->watch_fd = fd;
this->watch_flags = flags;
eloop.registerFd(this, fd, flags, enabled);
// (which will not occur until the event handler returns, if it is active).
// In a single threaded environment, it is safe to delete the watcher after
// calling this method as long as the handler (if it is active) accesses no
- // internal state and returns Rearm::REMOVED.
+ // internal state and returns rearm::REMOVED.
void deregister(EventLoop &eloop) noexcept
{
eloop.deregister(this, this->watch_fd);
}
}
- // virtual Rearm fdEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
+ // Add an Fd watch via a lambda. The watch is allocated dynamically and destroys
+ // itself when removed from the event loop.
+ template <typename T>
+ static FdWatcher<EventLoop> *addWatch(EventLoop &eloop, int fd, int flags, T watchHndlr)
+ {
+ class LambdaFdWatcher : public FdWatcher<EventLoop>
+ {
+ private:
+ T watchHndlr;
+
+ public:
+ LambdaFdWatcher(T watchHandlr_a) : watchHndlr(watchHandlr_a)
+ {
+ //
+ }
+
+ rearm fdEvent(EventLoop &eloop, int fd, int flags) override
+ {
+ return watchHndlr(eloop, fd, flags);
+ }
+
+ void watchRemoved() noexcept override
+ {
+ delete this;
+ }
+ };
+
+ LambdaFdWatcher * lfd = new LambdaFdWatcher(watchHndlr);
+ lfd->addWatch(eloop, fd, flags);
+ return lfd;
+ }
+
+ // virtual rearm fdEvent(EventLoop<T_Mutex> &, int fd, int flags) = 0;
};
// A Bi-directional file descriptor watcher with independent read- and write- channels.
// can be any combination of dasynq::IN_EVENTS / dasynq::OUT_EVENTS.
//
// Can fail with std::bad_alloc or std::system_error.
- void addWatch(EventLoop &eloop, int fd, int flags)
+ void addWatch(EventLoop &eloop, int fd, int flags, int inprio = DEFAULT_PRIORITY, int outprio = DEFAULT_PRIORITY)
{
BaseWatcher::init();
this->outWatcher.BaseWatcher::init();
this->watch_flags = flags | dprivate::multi_watch;
this->read_removed = false;
this->write_removed = false;
+ this->priority = inprio;
+ this->set_priority(this->outWatcher, outprio);
eloop.registerFd(this, fd, flags);
}
// (which will not occur until the event handler returns, if it is active).
// In a single threaded environment, it is safe to delete the watcher after
// calling this method as long as the handler (if it is active) accesses no
- // internal state and returns Rearm::REMOVED.
+ // internal state and returns rearm::REMOVED.
void deregister(EventLoop &eloop) noexcept
{
eloop.deregister(this, this->watch_fd);
}
- // Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
- // Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
+ // rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
+ // rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
};
// Child process event watcher
// Registration can fail with std::bad_alloc.
// Note that in multi-threaded programs, use of this function may be prone to a
// race condition such that the child terminates before the watcher is registered.
- void addWatch(EventLoop &eloop, pid_t child)
+ void addWatch(EventLoop &eloop, pid_t child, int prio = DEFAULT_PRIORITY)
{
BaseWatcher::init();
this->watch_pid = child;
+ this->priority = prio;
eloop.registerChild(this, child);
}
// after having reserved resources previously (using reserveWith).
// Registration cannot fail.
// Note that in multi-threaded programs, use of this function may be prone to a
- // race condition such that the child terminates before the watcher is registered.
- void addReserved(EventLoop &eloop, pid_t child) noexcept
+ // race condition such that the child terminates before the watcher is registered;
+ // use the "fork" member function to avoid this.
+ void addReserved(EventLoop &eloop, pid_t child, int prio = DEFAULT_PRIORITY) noexcept
{
BaseWatcher::init();
+ this->watch_pid = child;
+ this->priority = prio;
eloop.registerReservedChild(this, child);
}
}
}
- // virtual Rearm childStatus(EventLoop &, pid_t child, int status) = 0;
+ // virtual rearm childStatus(EventLoop &, pid_t child, int status) = 0;
+};
+
+template <typename EventLoop>
+class Timer : private BaseTimerWatcher<typename EventLoop::mutex_t>
+{
+ public:
+
+ // Allocate a timer (using the MONOTONIC clock)
+ void addTimer(EventLoop &eloop, int prio = DEFAULT_PRIORITY)
+ {
+ this->priority = prio;
+ eloop.registerTimer(this);
+ }
+
+ void armTimer(EventLoop &eloop, struct timespec &timeout) noexcept
+ {
+ eloop.setTimer(this, timeout);
+ }
+
+ void armTimer(EventLoop &eloop, struct timespec &timeout, struct timespec &interval) noexcept
+ {
+ eloop.setTimer(this, timeout, interval);
+ }
+
+ // Arm timer, relative to now:
+ void armTimerRel(EventLoop &eloop, struct timespec &timeout) noexcept
+ {
+ eloop.setTimerRel(this, timeout);
+ }
+
+ void armTimerRel(EventLoop &eloop, struct timespec &timeout, struct timespec &interval) noexcept
+ {
+ eloop.setTimerRel(this, timeout, interval);
+ }
+
+ void deregister(EventLoop &eloop) noexcept
+ {
+ eloop.deregister(this);
+ }
};
} // namespace dasynq::dprivate