Hopefully the Dasynq API is now fairly stable.
char outbuf[] = { DINIT_RP_BADREQ };
if (! queuePacket(outbuf, 1)) return false;
bad_conn_close = true;
- iob.setWatchFlags(OUT_EVENTS);
+ iob.setWatches(OUT_EVENTS);
}
return true;
}
char badreqRep[] = { DINIT_RP_BADREQ };
if (! queuePacket(badreqRep, 1)) return false;
bad_conn_close = true;
- iob.setWatchFlags(OUT_EVENTS);
+ iob.setWatches(OUT_EVENTS);
return true;
}
char badreqRep[] = { DINIT_RP_BADREQ };
if (! queuePacket(badreqRep, 1)) return false;
bad_conn_close = true;
- iob.setWatchFlags(OUT_EVENTS);
+ iob.setWatches(OUT_EVENTS);
return true;
}
else {
char badreqRep[] = { DINIT_RP_BADREQ };
if (! queuePacket(badreqRep, 1)) return false;
bad_conn_close = true;
- iob.setWatchFlags(OUT_EVENTS);
+ iob.setWatches(OUT_EVENTS);
return true;
}
else {
else {
if ((unsigned)wr == size) {
// Ok, all written.
- iob.setWatchFlags(in_flag);
+ iob.setWatches(in_flag);
return true;
}
pkt += wr;
// Create a vector out of the (remaining part of the) packet:
try {
outbuf.emplace_back(pkt, pkt + size);
- iob.setWatchFlags(in_flag | OUT_EVENTS);
+ iob.setWatches(in_flag | OUT_EVENTS);
return true;
}
catch (std::bad_alloc &baexc) {
return false;
}
else {
- iob.setWatchFlags(OUT_EVENTS);
+ iob.setWatches(OUT_EVENTS);
return true;
}
}
else {
if ((unsigned)wr == pkt.size()) {
// Ok, all written.
- iob.setWatchFlags(in_flag);
+ iob.setWatches(in_flag);
return true;
}
outpkt_index = wr;
try {
outbuf.emplace_back(pkt);
- iob.setWatchFlags(in_flag | OUT_EVENTS);
+ iob.setWatches(in_flag | OUT_EVENTS);
return true;
}
catch (std::bad_alloc &baexc) {
return false;
}
else {
- iob.setWatchFlags(OUT_EVENTS);
+ iob.setWatches(OUT_EVENTS);
return true;
}
}
// TODO log error?
// TODO error response?
bad_conn_close = true;
- iob.setWatchFlags(OUT_EVENTS);
+ iob.setWatches(OUT_EVENTS);
}
else {
int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
- iob.setWatchFlags(IN_EVENTS | out_flags);
+ iob.setWatches(IN_EVENTS | out_flags);
}
return false;
outpkt_index = 0;
if (outbuf.empty() && ! oom_close) {
if (! bad_conn_close) {
- iob.setWatchFlags(IN_EVENTS);
+ iob.setWatches(IN_EVENTS);
}
else {
return true;
ControlConn::~ControlConn() noexcept
{
close(iob.fd);
- iob.deregisterWatch(loop);
+ iob.deregister(*loop);
// Clear service listeners
for (auto p : serviceKeyMap) {
class ControlConnWatcher : public EventLoop_t::BidiFdWatcher
{
- inline Rearm receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept;
+ inline Rearm receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept;
- Rearm readReady(EventLoop_t * loop, int fd) noexcept override
+ Rearm readReady(EventLoop_t &loop, int fd) noexcept override
{
return receiveEvent(loop, fd, IN_EVENTS);
}
- Rearm writeReady(EventLoop_t * loop, int fd) noexcept override
+ Rearm writeReady(EventLoop_t &loop, int fd) noexcept override
{
return receiveEvent(loop, fd, OUT_EVENTS);
}
int fd; // TODO this is already stored, find a better way to access it.
EventLoop_t * eventLoop;
- void setWatchFlags(int flags)
+ void setWatches(int flags)
{
- EventLoop_t::BidiFdWatcher::setWatchFlags(eventLoop, flags);
+ EventLoop_t::BidiFdWatcher::setWatches(*eventLoop, flags);
}
- void registerWith(EventLoop_t *loop, int fd, int flags)
+ void registerWith(EventLoop_t &loop, int fd, int flags)
{
this->fd = fd;
- this->eventLoop = loop;
- BidiFdWatcher<EventLoop_t>::registerWith(loop, fd, flags);
+ this->eventLoop = &loop;
+ BidiFdWatcher<EventLoop_t>::addWatch(loop, fd, flags);
}
};
-inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept
+inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t &loop, int fd, int flags) noexcept
{
- return control_conn_cb(loop, this, flags);
+ return control_conn_cb(&loop, this, flags);
}
{
bad_conn_close = true;
oom_close = true;
- iob.setWatchFlags(OUT_EVENTS);
+ iob.setWatches(OUT_EVENTS);
}
// Process service event broadcast.
public:
ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
{
- iob.registerWith(loop, fd, IN_EVENTS);
+ iob.registerWith(*loop, fd, IN_EVENTS);
active_control_conns++;
}
backup_vector.resize(backup_vector.size() + 1);
}
+ void unreserve() noexcept
+ {
+ backup_vector.resize(backup_vector.size() - 1);
+ }
+
void add(pid_t key, void *val) // throws std::bad_alloc
{
base_map[key] = val;
public:
void reserveChildWatch()
{
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
child_waiters.reserve();
}
+ void unreserveChildWatch() noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ child_waiters.unreserve();
+ }
+
void addChildWatch(pid_t child, void *val)
{
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
child_waiters.add(child, val);
}
void addReservedChildWatch(pid_t child, void *val) noexcept
{
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
child_waiters.add_from_reserve(child, val);
}
+
+ void addReservedChildWatch_nolock(pid_t child, void *val) noexcept
+ {
+ child_waiters.add_from_reserve(child, val);
+ }
+
+ void removeChildWatch(pid_t child) noexcept
+ {
+ std::lock_guard<decltype(Base::lock)> guard(Base::lock);
+ child_waiters.erase(child);
+ }
template <typename T> void init(T *loop_mech)
{
const static bool has_bidi_fd_watch = true;
const static bool has_separate_rw_fd_watches = false;
+ const static bool supports_childwatch_reservation = true;
};
}
}
+ void addBidiFdWatch(int fd, void *userdata, int flags)
+ {
+ // No implementation.
+ throw std::system_error(std::make_error_code(std::errc::not_supported));
+ }
+
void removeFdWatch(int fd, int flags) noexcept
{
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr);
removeFdWatch(fd, flags);
}
+ void removeBidiFdWatch(int fd) noexcept
+ {
+ // Shouldn't be called for epoll.
+ removeFdWatch(fd, IN_EVENTS | OUT_EVENTS);
+ }
+
// Note this will *replace* the old flags with the new, that is,
// it can enable *or disable* read/write events.
void enableFdWatch(int fd, void *userdata, int flags) noexcept
const static bool has_bidi_fd_watch = false;
const static bool has_separate_rw_fd_watches = true;
+ const static bool supports_childwatch_reservation = true;
};
#if defined(__OpenBSD__)
void addFdWatch(int fd, void *userdata, int flags)
{
// TODO kqueue doesn't support EVFILE_WRITE on file fd's :/
+ // Presumably they cause the kevent call to fail. We could maintain
+ // a separate set and use poll() (urgh).
short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE;
}
}
+ void addBidiFdWatch(int fd, void *userdata, int flags)
+ {
+ struct kevent kev[2];
+ EV_SET(&kev[0], fd, EVFILT_READ, EV_ADD, 0, 0, userdata);
+ EV_SET(&kev[1], fd, EVFILE_WRITE, EV_ADD, 0, 0, userdata);
+
+ if (kevent(kqfd, kev, 2, nullptr, 0, nullptr) == -1) {
+ throw new std::system_error(errno, std::system_category());
+ }
+ }
+
void removeFdWatch(int fd, int flags)
{
removeFilter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd);
{
removeFdWatch(fd, flags);
}
+
+ void removeBidiFdWatch(int fd) noexcept
+ {
+ struct kevent kev[2];
+ EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
+ EV_SET(&kev[1], fd, EVFILE_WRITE, EV_DELETE, 0, 0, userdata);
+
+ kevent(kqfd, kev, 2, nullptr, 0, nullptr);
+ }
void enableFdWatch(int fd, void *userdata, int flags)
{
#include <condition_variable>
#include <cstdint>
#include <cstddef>
+#include <system_error>
+
+#include <unistd.h>
+#include <fcntl.h>
#include "dasynq-mutex.h"
public:
typedef SigInfo &SigInfo_p;
- virtual Rearm gotSignal(EventLoop<T_Mutex> * eloop, int signo, SigInfo_p siginfo) = 0;
+ virtual Rearm received(EventLoop<T_Mutex> &eloop, int signo, SigInfo_p siginfo) = 0;
};
template <typename T_Mutex>
BaseFdWatcher() noexcept : BaseWatcher(WatchType::FD) { }
public:
- virtual Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) = 0;
+ virtual Rearm fdEvent(EventLoop<T_Mutex> &eloop, int fd, int flags) = 0;
};
template <typename T_Mutex>
friend class dasynq::EventLoop<T_Mutex>;
// This should never actually get called:
- Rearm gotEvent(EventLoop<T_Mutex> * eloop, int fd, int flags) final
+ Rearm fdEvent(EventLoop<T_Mutex> &eloop, int fd, int flags) final
{
return Rearm::REARM; // should not be reachable.
};
int write_removed : 1; // write watch removed?
public:
- virtual Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept = 0;
- virtual Rearm writeReady(EventLoop<T_Mutex> * eloop, int fd) noexcept = 0;
+ virtual Rearm readReady(EventLoop<T_Mutex> &eloop, int fd) noexcept = 0;
+ virtual Rearm writeReady(EventLoop<T_Mutex> &eloop, int fd) noexcept = 0;
};
template <typename T_Mutex>
BaseChildWatcher() : BaseWatcher(WatchType::CHILD) { }
public:
- virtual void gotTermStat(EventLoop<T_Mutex> * eloop, pid_t child, int status) = 0;
+ virtual Rearm childStatus(EventLoop<T_Mutex> &eloop, pid_t child, int status) = 0;
};
// Classes for implementing a fair(ish) wait queue.
lock.lock();
- // TODO this needs to handle multi-watch (BidiFdWatcher) properly
-
if (watcher->active) {
// If the watcher is active, set deleteme true; the watcher will be removed
// at the end of current processing (i.e. when active is set false).
friend class dprivate::SignalWatcher<EventLoop<T_Mutex>>;
friend class dprivate::ChildProcWatcher<EventLoop<T_Mutex>>;
+ public:
+ using LoopTraits = dasynq::LoopTraits;
+
+ private:
template <typename T, typename U> using EventDispatch = dprivate::EventDispatch<T,U>;
template <typename T> using waitqueue = dprivate::waitqueue<T>;
template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
void registerFd(BaseBidiFdWatcher *callback, int fd, int eventmask)
{
if (LoopTraits::has_separate_rw_fd_watches) {
- // TODO
+ loop_mech.addBidiFdWatch(fd, callback, eventmask | ONE_SHOT);
}
else {
loop_mech.addFdWatch(fd, callback, eventmask | ONE_SHOT);
void deregister(BaseBidiFdWatcher *callback, int fd)
{
if (LoopTraits::has_separate_rw_fd_watches) {
- // TODO
+ loop_mech.removeBidiFdWatch(fd);
}
else {
loop_mech.removeFdWatch(fd, callback->watch_flags);
loop_mech.reserveChildWatch();
}
+ void unreserve(BaseChildWatcher *callBack)
+ {
+ loop_mech.unreserveChildWatch();
+ }
+
void registerChild(BaseChildWatcher *callBack, pid_t child)
{
loop_mech.addChildWatch(child, callBack);
{
loop_mech.addReservedChildWatch(child, callBack);
}
+
+ void registerReservedChild_nolock(BaseChildWatcher *callBack, pid_t child) noexcept
+ {
+ loop_mech.addReservedChildWatch_nolock(child, callBack);
+ }
+
+ void deregister(BaseChildWatcher *callback, pid_t child)
+ {
+ loop_mech.removeChildWatch(child);
+
+ waitqueue_node<T_Mutex> qnode;
+ getAttnLock(qnode);
+
+ EventDispatch<T_Mutex, LoopTraits> & ed = (EventDispatch<T_Mutex, LoopTraits> &) loop_mech;
+ ed.issueDelete(callback);
+
+ releaseLock(qnode);
+ }
void dequeueWatcher(BaseWatcher *watcher) noexcept
{
}
else if (pqueue->watchType == WatchType::SECONDARYFD) {
is_multi_watch = true;
+ // construct a pointer to the main watcher:
char * rp = (char *)pqueue;
rp -= offsetof(BaseBidiFdWatcher, outWatcher);
bbfw = (BaseBidiFdWatcher *)rp;
switch (pqueue->watchType) {
case WatchType::SIGNAL: {
BaseSignalWatcher *bsw = static_cast<BaseSignalWatcher *>(pqueue);
- rearmType = bsw->gotSignal(this, bsw->siginfo.get_signo(), bsw->siginfo);
+ rearmType = bsw->received(*this, bsw->siginfo.get_signo(), bsw->siginfo);
break;
}
case WatchType::FD: {
if (is_multi_watch) {
// The primary watcher for a multi-watch watcher is queued for
// read events.
- rearmType = bbfw->readReady(this, bfw->watch_fd);
+ rearmType = bbfw->readReady(*this, bfw->watch_fd);
}
else {
- rearmType = bfw->gotEvent(this, bfw->watch_fd, bfw->event_flags);
+ rearmType = bfw->fdEvent(*this, bfw->watch_fd, bfw->event_flags);
}
break;
}
case WatchType::CHILD: {
BaseChildWatcher *bcw = static_cast<BaseChildWatcher *>(pqueue);
- bcw->gotTermStat(this, bcw->watch_pid, bcw->child_status);
- // Child watches automatically remove:
- // TODO what if they want to return REMOVED...
- rearmType = Rearm::REMOVE;
+ rearmType = bcw->childStatus(*this, bcw->watch_pid, bcw->child_status);
break;
}
case WatchType::SECONDARYFD: {
- // first construct a pointer to the main watcher:
- rearmType = bbfw->writeReady(this, bbfw->watch_fd);
+ rearmType = bbfw->writeReady(*this, bbfw->watch_fd);
break;
}
default: ;
using SignalWatcher = dprivate::SignalWatcher<EventLoop<T_Mutex>>;
using ChildProcWatcher = dprivate::ChildProcWatcher<EventLoop<T_Mutex>>;
+ // using LoopTraits = dasynq::LoopTraits;
+
void run() noexcept
{
while (! processEvents()) {
// Register this watcher to watch the specified signal.
// If an attempt is made to register with more than one event loop at
- // a time, behaviour is undefined.
- inline void registerWatch(EventLoop *eloop, int signo)
+ // a time, behaviour is undefined. The signal should be masked before
+ // call.
+ inline void addWatch(EventLoop &eloop, int signo)
{
BaseWatcher::init();
this->siginfo.set_signo(signo);
- eloop->registerSignal(this, signo);
+ eloop.registerSignal(this, signo);
}
- inline void deregisterWatch(EventLoop *eloop) noexcept
+ inline void deregister(EventLoop &eloop) noexcept
{
- eloop->deregister(this, this->siginfo.get_signo());
+ eloop.deregister(this, this->siginfo.get_signo());
}
- // virtual Rearm gotSignal(EventLoop<T_Mutex> *, int signo, SigInfo_p info) = 0;
+ // virtual Rearm received(EventLoop &, int signo, SigInfo_p info) = 0;
};
// Posix file descriptor event watcher
// Set the types of event to watch. Only supported if LoopTraits::has_bidi_fd_watch
// is true; otherwise has unspecified behavior.
- // Only safe to call from within the callback handler (gotEvent). Might not take
+ // Only safe to call from within the callback handler (fdEvent). Might not take
// effect until the current callback handler returns with REARM.
void setWatchFlags(int newFlags)
{
// causes undefined behavior.
//
// Can fail with std::bad_alloc or std::system_error.
- void registerWith(EventLoop *eloop, int fd, int flags)
+ void addWatch(EventLoop &eloop, int fd, int flags)
{
BaseWatcher::init();
this->watch_fd = fd;
this->watch_flags = flags;
- eloop->registerFd(this, fd, flags);
+ eloop.registerFd(this, fd, flags);
}
// Deregister a file descriptor watcher.
// calling this method as long as the handler (if it is active) accesses no
// internal state and returns Rearm::REMOVED.
// TODO: implement REMOVED, or correct above statement.
- void deregisterWatch(EventLoop *eloop) noexcept
+ void deregister(EventLoop &eloop) noexcept
{
- eloop->deregister(this, this->watch_fd);
+ eloop.deregister(this, this->watch_fd);
}
- void setEnabled(EventLoop *eloop, bool enable) noexcept
+ void setEnabled(EventLoop &eloop, bool enable) noexcept
{
- std::lock_guard<T_Mutex> guard(eloop->getBaseLock());
- eloop->setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable);
+ std::lock_guard<T_Mutex> guard(eloop.getBaseLock());
+ eloop.setFdEnabled_nolock(this, this->watch_fd, this->watch_flags, enable);
if (! enable) {
- eloop->dequeueWatcher(this);
+ eloop.dequeueWatcher(this);
}
}
- // virtual Rearm gotEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
+ // virtual Rearm fdEvent(EventLoop<T_Mutex> *, int fd, int flags) = 0;
};
// A Bi-directional file descriptor watcher with independent read- and write- channels.
using BaseWatcher = dprivate::BaseWatcher;
using T_Mutex = typename EventLoop::mutex_t;
- void setWatchEnabled(EventLoop *eloop, bool in, bool b)
+ void setWatchEnabled(EventLoop &eloop, bool in, bool b)
{
int events = in ? IN_EVENTS : OUT_EVENTS;
}
if (LoopTraits::has_separate_rw_fd_watches) {
dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
- eloop->setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b);
+ eloop.setFdEnabled_nolock(watcher, this->watch_fd, events | ONE_SHOT, b);
if (! b) {
- eloop->dequeueWatcher(watcher);
+ eloop.dequeueWatcher(watcher);
}
}
else {
- eloop->setFdEnabled_nolock(this, this->watch_fd,
+ eloop.setFdEnabled_nolock(this, this->watch_fd,
(this->watch_flags & (IN_EVENTS | OUT_EVENTS)) | ONE_SHOT,
(this->watch_flags & (IN_EVENTS | OUT_EVENTS)) != 0);
if (! b) {
dprivate::BaseWatcher * watcher = in ? this : &this->outWatcher;
- eloop->dequeueWatcher(watcher);
+ eloop.dequeueWatcher(watcher);
}
}
}
// TODO if a watch is disabled and currently queued, we should de-queue it.
- void setInWatchEnabled(EventLoop *eloop, bool b) noexcept
+ void setInWatchEnabled(EventLoop &eloop, bool b) noexcept
{
- eloop->getBaseLock().lock();
+ eloop.getBaseLock().lock();
setWatchEnabled(eloop, true, b);
- eloop->getBaseLock().unlock();
+ eloop.getBaseLock().unlock();
}
- void setOutWatchEnabled(EventLoop *eloop, bool b) noexcept
+ void setOutWatchEnabled(EventLoop &eloop, bool b) noexcept
{
- eloop->getBaseLock().lock();
+ eloop.getBaseLock().lock();
setWatchEnabled(eloop, false, b);
- eloop->getBaseLock().unlock();
+ eloop.getBaseLock().unlock();
}
// Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly.
/// - unless the event loop will not be polled while the watcher is active.
// (i.e. it is ok to call setWatchFlags from within the readReady/writeReady handlers if no other
// thread will poll the event loop; it is ok to *dis*able a watcher that might be active).
- void setWatchFlags(EventLoop * eloop, int newFlags)
+ void setWatches(EventLoop &eloop, int newFlags)
{
- std::lock_guard<T_Mutex> guard(eloop->getBaseLock());
+ std::lock_guard<T_Mutex> guard(eloop.getBaseLock());
if (LoopTraits::has_separate_rw_fd_watches) {
setWatchEnabled(eloop, true, (newFlags & IN_EVENTS) != 0);
setWatchEnabled(eloop, false, (newFlags & OUT_EVENTS) != 0);
}
else {
this->watch_flags = (this->watch_flags & ~IO_EVENTS) | newFlags;
- eloop->setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
+ eloop.setFdEnabled((dprivate::BaseWatcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
}
}
// can be any combination of dasynq::IN_EVENTS / dasynq::OUT_EVENTS.
//
// Can fail with std::bad_alloc or std::system_error.
- void registerWith(EventLoop *eloop, int fd, int flags)
+ void addWatch(EventLoop &eloop, int fd, int flags)
{
BaseWatcher::init();
this->outWatcher.BaseWatcher::init();
this->watch_fd = fd;
this->watch_flags = flags | dprivate::multi_watch;
- eloop->registerFd(this, fd, flags);
+ eloop.registerFd(this, fd, flags);
}
// Deregister a bi-direction file descriptor watcher.
// calling this method as long as the handler (if it is active) accesses no
// internal state and returns Rearm::REMOVED.
// TODO: implement REMOVED, or correct above statement.
- void deregisterWatch(EventLoop *eloop) noexcept
+ void deregister(EventLoop &eloop) noexcept
{
- eloop->deregister(this, this->watch_fd);
+ eloop.deregister(this, this->watch_fd);
}
// Rearm readReady(EventLoop<T_Mutex> * eloop, int fd) noexcept
public:
// Reserve resources for a child watcher with the given event loop.
- // Reservation can fail with std::bad_alloc.
- void reserveWith(EventLoop *eloop)
+ // Reservation can fail with std::bad_alloc. Some backends do not support
+ // reservation (it will always fail) - check LoopTraits::supports_childwatch_reservation.
+ void reserveWatch(EventLoop &eloop)
{
- eloop->reserveChildWatch();
+ eloop.reserveChildWatch(this);
+ }
+
+ void unreserve(EventLoop &eloop)
+ {
+ eloop.unreserve(this);
}
// Register a watcher for the given child process with an event loop.
// Registration can fail with std::bad_alloc.
- void registerWith(EventLoop *eloop, pid_t child)
+ // Note that in multi-threaded programs, use of this function may be prone to a
+ // race condition such that the child terminates before the watcher is registered.
+ void addWatch(EventLoop &eloop, pid_t child)
{
BaseWatcher::init();
this->watch_pid = child;
- eloop->registerChild(this, child);
+ eloop.registerChild(this, child);
}
// Register a watcher for the given child process with an event loop,
// after having reserved resources previously (using reserveWith).
// Registration cannot fail.
- void registerReserved(EventLoop *eloop, pid_t child) noexcept
+ // Note that in multi-threaded programs, use of this function may be prone to a
+ // race condition such that the child terminates before the watcher is registered.
+ void addReserved(EventLoop &eloop, pid_t child) noexcept
{
BaseWatcher::init();
- eloop->registerReservedChild(this, child);
+ eloop.registerReservedChild(this, child);
+ }
+
+ void deregister(EventLoop &eloop, pid_t child) noexcept
+ {
+ eloop.deregister(this, child);
+ }
+
+ // Fork and watch the child with this watcher on the given event loop.
+ // If resource limitations prevent the child process from being watched, it is
+ // terminated immediately (or if the implementation allows, never started),
+ // and a suitable std::system_error or std::bad_alloc exception is thrown.
+ // Returns:
+ // - the child pid in the parent
+ // - 0 in the child
+ pid_t fork(EventLoop &eloop)
+ {
+ if (EventLoop::LoopTraits::supports_childwatch_reservation) {
+ // Reserve a watch, fork, then claim reservation
+ reserveWatch(eloop);
+
+ auto &lock = eloop.getBaseLock();
+ lock.lock();
+
+ pid_t child = ::fork();
+ if (child == -1) {
+ // Unreserve watch.
+ lock.unlock();
+ unreserve(eloop);
+ throw std::system_error(errno, std::system_category());
+ }
+
+ if (child == 0) {
+ // I am the child
+ lock.unlock(); // may not really be necessary
+ return 0;
+ }
+
+ // Register this watcher.
+ eloop.registerReservedChild_nolock(this, child);
+ lock.unlock();
+ return child;
+ }
+ else {
+ int pipefds[2];
+ if (pipe2(pipefds, O_CLOEXEC) == -1) {
+ throw std::system_error(errno, std::system_category());
+ }
+
+ std::lock_guard<T_Mutex> guard(eloop.getBaseLock());
+
+ pid_t child = ::fork();
+ if (child == -1) {
+ throw std::system_error(errno, std::system_category());
+ }
+
+ if (child == 0) {
+ // I am the child
+
+ // Wait for message from parent before continuing:
+ int rr;
+ int r = read(pipefds[0], &rr, sizeof(rr));
+ while (r == -1 && errno == EINTR) {
+ read(pipefds[0], &rr, sizeof(rr));
+ }
+
+ if (r == -1) _exit(0);
+
+ close(pipefds[0]);
+ return 0;
+ }
+
+ close(pipefds[0]); // close read end
+
+ // Register this watcher.
+ try {
+ eloop.registerChild(this, child);
+
+ // Continue in child (it doesn't matter what is written):
+ write(pipefds[1], &pipefds, sizeof(int));
+ close(pipefds[1]);
+
+ return child;
+ }
+ catch (...) {
+ close(pipefds[1]);
+ throw;
+ }
+ }
}
- // virtual void gotTermStat(EventLoop<T_Mutex> *, pid_t child, int status) = 0;
+ // virtual Rearm childStatus(EventLoop &, pid_t child, int status) = 0;
};
} // namespace dasynq::dprivate
release = false;
}
- Rearm gotEvent(EventLoop_t *loop, int fd, int flags) noexcept override;
+ Rearm fdEvent(EventLoop_t &loop, int fd, int flags) noexcept override;
// Check whether the console can be released.
void flushForRelease();
bool was_first = current_index == 0;
current_index = log_buffer.get_length();
if (was_first && ! release) {
- setEnabled(&eventLoop, true);
+ setEnabled(eventLoop, true);
}
}
// Try to flush any messages that are currently buffered. (Console is non-blocking
// so it will fail gracefully).
- if (gotEvent(&eventLoop, fd, OUT_EVENTS) == Rearm::DISARM) {
+ if (fdEvent(eventLoop, fd, OUT_EVENTS) == Rearm::DISARM) {
// Console has already been released at this point.
- setEnabled(&eventLoop, false);
+ setEnabled(eventLoop, false);
}
- // gotEvent didn't want to disarm, so must be partway through a message; will
+ // fdEvent didn't want to disarm, so must be partway through a message; will
// release when it's finished.
}
-Rearm BufferedLogStream::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept
+Rearm BufferedLogStream::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
{
auto &log_stream = *this;
void init_log(ServiceSet *sset)
{
service_set = sset;
- log_stream[DLOG_CONS].registerWith(&eventLoop, STDOUT_FILENO, OUT_EVENTS); // TODO register in disabled state
+ log_stream[DLOG_CONS].addWatch(eventLoop, STDOUT_FILENO, OUT_EVENTS); // TODO register in disabled state
enable_console_log(true);
}
void setup_main_log(int fd)
{
log_stream[DLOG_MAIN].init(fd);
- log_stream[DLOG_MAIN].registerWith(&eventLoop, fd, OUT_EVENTS);
+ log_stream[DLOG_MAIN].addWatch(eventLoop, fd, OUT_EVENTS);
}
bool is_log_flushed() noexcept
fcntl(1, F_SETFL, flags | O_NONBLOCK);
// Activate watcher:
log_stream[DLOG_CONS].init(STDOUT_FILENO);
- log_stream[DLOG_CONS].setEnabled(&eventLoop, true);
+ log_stream[DLOG_CONS].setEnabled(eventLoop, true);
}
else if (! enable && log_to_console) {
log_stream[DLOG_CONS].flushForRelease();
class ControlSocketWatcher : public EventLoop_t::FdWatcher
{
- Rearm gotEvent(EventLoop_t * loop, int fd, int flags)
+ Rearm fdEvent(EventLoop_t &loop, int fd, int flags) override
{
- control_socket_cb(loop, fd);
+ control_socket_cb(&loop, fd);
return Rearm::REARM;
}
// TODO the fd is already stored, must we really store it again...
int fd;
- void registerWith(EventLoop_t * loop, int fd, int flags)
+ void addWatch(EventLoop_t &loop, int fd, int flags)
{
this->fd = fd;
- EventLoop_t::FdWatcher::registerWith(loop, fd, flags);
+ EventLoop_t::FdWatcher::addWatch(loop, fd, flags);
}
};
this->cb_func = cb_func;
}
- Rearm gotSignal(EventLoop_t * eloop, int signo, SigInfo_p siginfo) override
+ Rearm received(EventLoop_t &eloop, int signo, SigInfo_p siginfo) override
{
service_set->stop_all_services(ShutdownType::REBOOT);
return Rearm::REARM;
class ControlSocketWatcher : public EventLoop_t::FdWatcher
{
- Rearm gotEvent(EventLoop_t * loop, int fd, int flags)
+ Rearm fdEvent(EventLoop_t &loop, int fd, int flags)
{
- control_socket_cb(loop, fd);
+ control_socket_cb(&loop, fd);
return Rearm::REARM;
}
};
auto sigterm_watcher = CallbackSignalHandler(sigterm_cb);
- sigint_watcher.registerWatch(&eventLoop, SIGINT);
- sigquit_watcher.registerWatch(&eventLoop, SIGQUIT);
- sigterm_watcher.registerWatch(&eventLoop, SIGTERM);
+ sigint_watcher.addWatch(eventLoop, SIGINT);
+ sigquit_watcher.addWatch(eventLoop, SIGQUIT);
+ sigterm_watcher.addWatch(eventLoop, SIGTERM);
// Try to open control socket (may fail due to readonly filesystem)
open_control_socket(&eventLoop);
}
control_socket_open = true;
- control_socket_io.registerWith(&eventLoop, sockfd, IN_EVENTS);
+ control_socket_io.addWatch(eventLoop, sockfd, IN_EVENTS);
}
}
{
if (control_socket_open) {
int fd = control_socket_io.fd;
- control_socket_io.deregisterWatch(&eventLoop);
+ control_socket_io.deregister(*loop);
close(fd);
// Unlink the socket:
}
}
-void ServiceChildWatcher::gotTermStat(EventLoop_t * loop, pid_t child, int status) noexcept
+dasynq::Rearm ServiceChildWatcher::childStatus(EventLoop_t &loop, pid_t child, int status) noexcept
{
ServiceRecord *sr = service;
if (sr->waiting_for_execstat) {
// We still don't have an exec() status from the forked child, wait for that
// before doing any further processing.
- return;
+ return Rearm::REMOVE;
}
- sr->handle_exit_status();
+ // Must deregister now since handle_exit_status might result in re-launch:
+ deregister(loop, child);
+
+ sr->handle_exit_status();
+ return Rearm::REMOVED;
}
bool ServiceRecord::do_auto_restart() noexcept
}
}
-Rearm ServiceIoWatcher::gotEvent(EventLoop_t *loop, int fd, int flags) noexcept
+Rearm ServiceIoWatcher::fdEvent(EventLoop_t &loop, int fd, int flags) noexcept
{
- ServiceRecord::process_child_status(loop, this, flags);
+ ServiceRecord::process_child_status(&loop, this, flags);
return Rearm::REMOVED;
}
int exec_status;
int r = read(stat_io->fd, &exec_status, sizeof(int));
close(stat_io->fd);
- stat_io->deregisterWatch(loop);
+ stat_io->deregister(*loop);
if (r > 0) {
// We read an errno code; exec() failed, and the service startup failed.
pidbuf[r] = 0; // store nul terminator
pid = std::atoi(pidbuf);
if (kill(pid, 0) == 0) {
- child_listener.registerWith(&eventLoop, pid);
+ child_listener.addWatch(eventLoop, pid);
}
else {
log(LogLevel::ERROR, service_name, ": pid read from pidfile (", pid, ") is not valid");
// TODO make sure pipefd's are not 0/1/2 (STDIN/OUT/ERR) - if they are, dup them
// until they are not.
-
- pid_t forkpid = fork();
- if (forkpid == -1) {
- // TODO log error
+
+ pid_t forkpid;
+
+ bool child_status_registered = false;
+ try {
+ child_status_listener.addWatch(eventLoop, pipefd[0], IN_EVENTS);
+ child_status_registered = true;
+
+ forkpid = child_listener.fork(eventLoop);
+ }
+ catch (...) {
+ if (child_status_registered) {
+ child_status_listener.deregister(eventLoop);
+ }
close(pipefd[0]);
close(pipefd[1]);
return false;
else {
// Parent process
close(pipefd[1]); // close the 'other end' fd
-
pid = forkpid;
- // Listen for status
- // TODO should set this up earlier so we can handle failure case (exception)
- child_status_listener.registerWith(&eventLoop, pipefd[0], IN_EVENTS);
-
- // Add a process listener so we can detect when the
- // service stops
- // TODO should reserve listener, handle exceptions
- child_listener.registerWith(&eventLoop, pid);
waiting_for_execstat = true;
return true;
}
public:
// TODO resolve clunkiness of storing this field
ServiceRecord * service;
- void gotTermStat(EventLoop_t * eloop, pid_t child, int status) noexcept;
+ Rearm childStatus(EventLoop_t &eloop, pid_t child, int status) noexcept;
ServiceChildWatcher(ServiceRecord * sr) noexcept : service(sr) { }
};
// TODO resolve clunkiness of storing these fields
int fd;
ServiceRecord * service;
- Rearm gotEvent(EventLoop_t * eloop, int fd, int flags) noexcept;
+ Rearm fdEvent(EventLoop_t &eloop, int fd, int flags) noexcept;
ServiceIoWatcher(ServiceRecord * sr) noexcept : service(sr) { }
- void registerWith(EventLoop_t *loop, int fd, int flags)
+ void addWatch(EventLoop_t &loop, int fd, int flags)
{
this->fd = fd;
- EventLoop_t::FdWatcher::registerWith(loop, fd, flags);
+ EventLoop_t::FdWatcher::addWatch(loop, fd, flags);
}
};