1 #include <system_error>
3 #include <unordered_map>
16 #include "dasynq-config.h"
18 // "kqueue"-based event loop mechanism.
20 // kqueue is available on BSDs and Mac OS X, though there are subtle differences from OS to OS.
22 // kqueue supports watching file descriptors (input and output as separate watches only),
23 // signals, child processes, and timers. Unfortunately support for the latter two is imperfect;
24 // it is not possible to reserve process watches in advance; timers can only be active, count
25 // down immediately when created, and cannot be reset to another time. For timers especially
26 // the problems are significant: we can't allocate timers in advance, and we can't even feasibly
27 // manage our own timer queue via a single kqueue-backed timer. Therefore, an alternate timer
28 // mechanism must be used together with kqueue.
32 template <class Base> class kqueue_loop;
36 template <class Base> friend class kqueue_loop;
42 template <class Base> friend class kqueue_loop;
48 int get_signo() { return info.si_signo; }
49 int get_sicode() { return info.si_code; }
50 pid_t get_sipid() { return info.si_pid; }
51 uid_t get_siuid() { return info.si_uid; }
52 void * get_siaddr() { return info.si_addr; }
53 int get_sistatus() { return info.si_status; }
54 int get_sival_int() { return info.si_value.sival_int; }
55 void * get_sival_ptr() { return info.si_value.sival_ptr; }
58 int get_sierrno() { return info.si_errno; }
60 // XSR (streams) OB (obselete)
61 #if !defined(__OpenBSD__)
62 // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS
64 int get_siband() { return info.si_band; }
67 void set_signo(int signo) { info.si_signo = signo; }
72 // File descriptor optional storage. If the mechanism can return the file descriptor, this
73 // class will be empty, otherwise it can hold a file descriptor.
81 // File descriptor reference (passed to event callback). If the mechanism can return the
82 // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
83 // must be stored in an fd_s instance.
91 fd_r(int nfd) : fd(nfd)
96 constexpr static bool has_bidi_fd_watch = false;
97 constexpr static bool has_separate_rw_fd_watches = true;
98 constexpr static bool interrupt_after_fd_add = false;
99 constexpr static bool interrupt_after_signal_add = false;
100 constexpr static bool supports_non_oneshot_fd = false;
103 #if _POSIX_REALTIME_SIGNALS > 0
104 static inline void prepare_signal(int signo) { }
105 static inline void unprep_signal(int signo) { }
107 inline bool get_siginfo(int signo, siginfo_t *siginfo)
109 struct timespec timeout;
115 sigaddset(&mask, signo);
116 return (sigtimedwait(&mask, siginfo, &timeout) != -1);
120 // If we have no sigtimedwait implementation, we have to retrieve signal data by establishing a
123 // We need to declare and define a non-static data variable, "siginfo_p", in this header, without
124 // violating the "one definition rule". The only way to do that is via a template, even though we
125 // don't otherwise need a template here:
126 template <typename T = decltype(nullptr)> class sig_capture_templ
129 static siginfo_t * siginfo_p;
131 static void signalHandler(int signo, siginfo_t *siginfo, void *v)
133 *siginfo_p = *siginfo;
136 template <typename T> siginfo_t * sig_capture_templ<T>::siginfo_p = nullptr;
138 using sig_capture = sig_capture_templ<>;
140 inline void prepare_signal(int signo)
142 struct sigaction the_action;
143 the_action.sa_sigaction = sig_capture::signalHandler;
144 the_action.sa_flags = SA_SIGINFO;
145 sigfillset(&the_action.sa_mask);
147 sigaction(signo, &the_action, nullptr);
150 inline void unprep_signal(int signo)
152 signal(signo, SIG_DFL);
155 inline bool get_siginfo(int signo, siginfo_t *siginfo)
157 sig_capture::siginfo_p = siginfo;
161 sigdelset(&mask, signo);
168 template <class Base> class kqueue_loop : public Base
170 int kqfd; // kqueue fd
172 // The kqueue signal reporting mechanism *coexists* with the regular signal
173 // delivery mechanism without having any connection to it. Whereas regular signals can be
174 // queued (especially "realtime" signals, via sigqueue()), kqueue just maintains a counter
175 // of delivery attempts and clears this when we read the event. What this means is that
176 // kqueue won't necessarily tell us if signals are pending, in the case that:
177 // 1) it already reported the attempted signal delivery and
178 // 2) more than instance one of the same signal was pending at that time and
179 // 3) no more deliveries of the same signal have been attempted in the meantime.
180 // Also, kqueue won't tell us about signals that were pending at the time the signal filter
181 // was added. Finally, because pending signals can be merged, the count of delivery attempts
182 // provided by kqueue does not necessarily match the number of signals actually pending.
184 // Note that POSIX allows for multiple instances of a signal to be pending even on systems
185 // that don't support queueing of signals.
187 // Ultimately, this means we need to check for pending signals independently of what kqueue
191 // lock - a lock that can be used to protect internal structure.
192 // receive*() methods will be called with lock held.
193 // receive_signal(sigdata_t &, user *) noexcept
194 // receive_fd_event(fd_r, user *, int flags) noexcept
196 using sigdata_t = kqueue_traits::sigdata_t;
197 using fd_r = typename kqueue_traits::fd_r;
199 // The flag to specify poll() semantics for regular file readiness: that is, we want
200 // ready-for-read to be returned even at end of file:
201 #if defined(NOTE_FILE_POLL)
203 constexpr static int POLL_SEMANTICS = NOTE_FILE_POLL;
205 // Note that macOS has an "EV_POLL" defined that looks like it should give poll semantics
206 // when passed as a flag. However, it is filtered at the syscall entry so we cannot use it in
207 // kqueue. (The kernel uses it internally to implement poll()).
208 constexpr static int POLL_SEMANTICS = 0;
211 void process_events(struct kevent *events, int r)
213 std::lock_guard<decltype(Base::lock)> guard(Base::lock);
215 for (int i = 0; i < r; i++) {
216 if (events[i].filter == EVFILT_SIGNAL) {
217 bool reenable = pull_signal(events[i].ident, events[i].udata);
218 events[i].flags = reenable ? EV_ENABLE : EV_DISABLE;
220 else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) {
221 int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS;
222 auto r = Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags);
223 if (std::get<0>(r) == 0) {
224 // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for
225 // another connection).
226 events[i].flags = EV_DISABLE | EV_CLEAR;
229 events[i].flags = EV_ENABLE;
233 events[i].flags = EV_DISABLE;
237 // Now we disable all received events, to simulate EV_DISPATCH:
238 kevent(kqfd, events, r, nullptr, 0, nullptr);
241 // Pull a signal from pending, and report it, until it is no longer pending or the watch
242 // should be disabled. Call with lock held.
243 // Returns: true if watcher should be enabled, false if disabled.
244 bool pull_signal(int signo, void *userdata)
246 bool enable_filt = true;
249 #if _POSIX_REALTIME_SIGNALS > 0
250 struct timespec timeout = {0, 0};
252 sigemptyset(&sigw_mask);
253 sigaddset(&sigw_mask, signo);
254 int rsigno = sigtimedwait(&sigw_mask, &siginfo.info, &timeout);
256 if (Base::receive_signal(*this, siginfo, userdata)) {
260 rsigno = sigtimedwait(&sigw_mask, &siginfo.info, &timeout);
263 // we have no sigtimedwait.
264 sigset_t pending_sigs;
265 sigpending(&pending_sigs);
266 while (sigismember(&pending_sigs, signo)) {
267 get_siginfo(signo, &siginfo.info);
268 if (Base::receive_signal(*this, siginfo, userdata)) {
272 sigpending(&pending_sigs);
281 * kqueue_loop constructor.
283 * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
289 throw std::system_error(errno, std::system_category());
299 void set_filter_enabled(short filterType, uintptr_t ident, void *udata, bool enable)
301 // Note, on OpenBSD enabling or disabling filter will not alter the filter parameters (udata etc);
302 // on OS X however, it will. Therefore we set udata here (to the same value as it was originally
303 // set) in order to work correctly on both kernels.
305 int fflags = (filterType == EVFILT_READ) ? POLL_SEMANTICS : 0;
306 EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, fflags, 0, udata);
307 kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
310 void remove_filter(short filterType, uintptr_t ident)
313 EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0);
314 kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
317 // fd: file descriptor to watch
318 // userdata: data to associate with descriptor
319 // flags: IN_EVENTS | OUT_EVENTS | ONE_SHOT
320 // (only one of IN_EVENTS/OUT_EVENTS can be specified)
321 // soft_fail: true if unsupported file descriptors should fail by returning false instead
322 // of throwing an exception
323 // returns: true on success; false if file descriptor type isn't supported and emulate == true
324 // throws: std::system_error or std::bad_alloc on failure
325 bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool emulate = false)
327 short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE;
329 if (filter == EVFILT_READ && POLL_SEMANTICS == 0 && emulate) {
330 // We can't request poll semantics, so check for regular file:
332 if (fstat(fd, &statbuf) == -1) {
333 throw new std::system_error(errno, std::system_category());
335 if ((statbuf.st_mode & S_IFMT) == S_IFREG) {
336 // Regular file: emulation required
341 int fflags = (filter == EVFILT_READ) ? POLL_SEMANTICS : 0;
344 EV_SET(&kev, fd, filter, EV_ADD | (enabled ? 0 : EV_DISABLE), fflags, 0, userdata);
345 if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) {
346 // Note that kqueue supports EVFILT_READ on regular file fd's, but not EVFILT_WRITE.
347 if (filter == EVFILT_WRITE && errno == EINVAL && emulate) {
348 return false; // emulate
350 throw new std::system_error(errno, std::system_category());
355 // returns: 0 on success
356 // IN_EVENTS if in watch requires emulation
357 // OUT_EVENTS if out watch requires emulation
358 int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false)
361 struct kevent kev[2];
362 struct kevent kev_r[2];
363 short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
364 short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
365 EV_SET(&kev[0], fd, EVFILT_READ, rflags, POLL_SEMANTICS, 0, userdata);
366 EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
368 int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr);
371 throw new std::system_error(errno, std::system_category());
374 // Some possibilities:
375 // - both ends failed. We'll throw an error rather than allowing emulation.
376 // - read watch failed, write succeeded : should not happen.
377 // - read watch added, write failed: if emulate == true, succeed;
378 // if emulate == false, remove read and fail.
380 if (kev_r[0].data != 0) {
382 throw new std::system_error(kev_r[0].data, std::system_category());
385 if (kev_r[1].data != 0) {
387 // We can emulate, but, do we have correct semantics?
388 if (POLL_SEMANTICS != 0) {
392 // if we can't get poll semantics, emulate for read as well:
393 // first remove read watch:
394 EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
395 kevent(kqfd, kev, 1, nullptr, 0, nullptr);
396 return IN_EVENTS | OUT_EVENTS;
399 EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
400 kevent(kqfd, kev, 1, nullptr, 0, nullptr);
402 throw new std::system_error(kev_r[1].data, std::system_category());
407 // OpenBSD doesn't have EV_RECEIPT: install the watches one at a time
408 struct kevent kev[1];
410 short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE);
411 short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE);
412 EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata);
414 int r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
417 throw new std::system_error(errno, std::system_category());
420 EV_SET(&kev[0], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
422 r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
429 EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
430 kevent(kqfd, kev, 1, nullptr, 0, nullptr);
432 throw new std::system_error(errno, std::system_category());
439 // flags specifies which watch to remove; ignored if the loop doesn't support
440 // separate read/write watches.
441 void remove_fd_watch(int fd, int flags)
443 remove_filter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd);
446 void remove_fd_watch_nolock(int fd, int flags)
448 remove_fd_watch(fd, flags);
451 void remove_bidi_fd_watch(int fd) noexcept
453 struct kevent kev[2];
454 EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
455 EV_SET(&kev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
457 kevent(kqfd, kev, 2, nullptr, 0, nullptr);
460 void enable_fd_watch(int fd, void *userdata, int flags)
462 set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, userdata, true);
465 void enable_fd_watch_nolock(int fd, void *userdata, int flags)
467 enable_fd_watch(fd, userdata, flags);
470 void disable_fd_watch(int fd, int flags)
472 set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, nullptr, false);
475 void disable_fd_watch_nolock(int fd, int flags)
477 disable_fd_watch(fd, flags);
480 // Note signal should be masked before call.
481 void add_signal_watch(int signo, void *userdata)
483 std::lock_guard<decltype(Base::lock)> guard(Base::lock);
484 add_signal_watch_nolock(signo, userdata);
487 // Note signal should be masked before call.
488 void add_signal_watch_nolock(int signo, void *userdata)
490 prepare_signal(signo);
492 // We need to register the filter with the kqueue early, to avoid a race where we miss
495 EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD | EV_DISABLE, 0, 0, userdata);
496 if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) {
497 throw new std::system_error(errno, std::system_category());
499 // TODO use EV_DISPATCH if available (not on OpenBSD/OS X)
501 // The signal might be pending already but won't be reported by kqueue in that case. We can queue
502 // it immediately (note that it might be pending multiple times, so we need to re-check once signal
503 // processing finishes if it is re-armed).
505 bool enable_filt = pull_signal(signo, userdata);
508 evt.flags = EV_ENABLE;
509 if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) {
510 throw new std::system_error(errno, std::system_category());
515 // Note, called with lock held:
516 void rearm_signal_watch_nolock(int signo, void *userdata) noexcept
518 if (pull_signal(signo, userdata)) {
520 EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ENABLE, 0, 0, userdata);
521 // TODO use EV_DISPATCH if available (not on OpenBSD)
523 kevent(kqfd, &evt, 1, nullptr, 0, nullptr);
527 void remove_signal_watch_nolock(int signo) noexcept
529 unprep_signal(signo);
532 EV_SET(&evt, signo, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0);
534 kevent(kqfd, &evt, 1, nullptr, 0, nullptr);
537 void remove_signal_watch(int signo) noexcept
539 std::lock_guard<decltype(Base::lock)> guard(Base::lock);
540 remove_signal_watch_nolock(signo);
545 // If events are pending, process an unspecified number of them.
546 // If no events are pending, wait until one event is received and
547 // process this event (and possibly any other events received
549 // If processing an event removes a watch, there is a possibility
550 // that the watched event will still be reported (if it has
551 // occurred) before pull_events() returns.
553 // do_wait - if false, returns immediately if no events are
555 void pull_events(bool do_wait)
557 struct kevent events[16];
562 int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
563 if (r == -1 || r == 0) {
564 // signal or no events
569 process_events(events, r);
570 r = kevent(kqfd, nullptr, 0, events, 16, &ts);