Upgrade bundled Dasynq to 1.1.2.
[oweals/dinit.git] / src / dasynq / dasynq-kqueue.h
1 #include <system_error>
2 #include <type_traits>
3 #include <unordered_map>
4 #include <vector>
5 #include <tuple>
6
7 #include <sys/event.h>
8 #include <sys/time.h>
9 #include <sys/types.h>
10 #include <sys/wait.h>
11 #include <sys/stat.h>
12
13 #include <unistd.h>
14 #include <signal.h>
15
16 #include "dasynq-config.h"
17
18 // "kqueue"-based event loop mechanism.
19 //
20 // kqueue is available on BSDs and Mac OS X, though there are subtle differences from OS to OS.
21 //
22 // kqueue supports watching file descriptors (input and output as separate watches only),
23 // signals, child processes, and timers. Unfortunately support for the latter two is imperfect;
24 // it is not possible to reserve process watches in advance; timers can only be active, count
25 // down immediately when created, and cannot be reset to another time. For timers especially
26 // the problems are significant: we can't allocate timers in advance, and we can't even feasibly
27 // manage our own timer queue via a single kqueue-backed timer. Therefore, an alternate timer
28 // mechanism must be used together with kqueue.
29
30 namespace dasynq {
31
32 template <class Base> class kqueue_loop;
33
34 class kqueue_traits
35 {
36     template <class Base> friend class kqueue_loop;
37
38     public:
39
40     class sigdata_t
41     {
42         template <class Base> friend class kqueue_loop;
43         
44         siginfo_t info;
45         
46         public:
47         // mandatory:
48         int get_signo() { return info.si_signo; }
49         int get_sicode() { return info.si_code; }
50         pid_t get_sipid() { return info.si_pid; }
51         uid_t get_siuid() { return info.si_uid; }
52         void * get_siaddr() { return info.si_addr; }
53         int get_sistatus() { return info.si_status; }
54         int get_sival_int() { return info.si_value.sival_int; }
55         void * get_sival_ptr() { return info.si_value.sival_ptr; }
56         
57         // XSI
58         int get_sierrno() { return info.si_errno; }
59
60         // XSR (streams) OB (obselete)
61 #if !defined(__OpenBSD__)
62         // Note: OpenBSD doesn't have this; most other systems do. Technically it is part of the STREAMS
63         // interface.
64         int get_siband() { return info.si_band; }
65 #endif
66
67         void set_signo(int signo) { info.si_signo = signo; }
68     };    
69
70     class fd_r;
71
72     // File descriptor optional storage. If the mechanism can return the file descriptor, this
73     // class will be empty, otherwise it can hold a file descriptor.
74     class fd_s {
75         public:
76         fd_s(int) { }
77
78         DASYNQ_EMPTY_BODY
79     };
80
81     // File descriptor reference (passed to event callback). If the mechanism can return the
82     // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor
83     // must be stored in an fd_s instance.
84     class fd_r {
85         int fd;
86         public:
87         int getFd(fd_s ss)
88         {
89             return fd;
90         }
91         fd_r(int nfd) : fd(nfd)
92         {
93         }
94     };
95     
96     constexpr static bool has_bidi_fd_watch = false;
97     constexpr static bool has_separate_rw_fd_watches = true;
98     constexpr static bool interrupt_after_fd_add = false;
99     constexpr static bool interrupt_after_signal_add = false;
100     constexpr static bool supports_non_oneshot_fd = false;
101 };
102
103 #if _POSIX_REALTIME_SIGNALS > 0
104 static inline void prepare_signal(int signo) { }
105 static inline void unprep_signal(int signo) { }
106
107 inline bool get_siginfo(int signo, siginfo_t *siginfo)
108 {
109     struct timespec timeout;
110     timeout.tv_sec = 0;
111     timeout.tv_nsec = 0;
112
113     sigset_t mask;
114     sigemptyset(&mask);
115     sigaddset(&mask, signo);
116     return (sigtimedwait(&mask, siginfo, &timeout) != -1);
117 }
118 #else
119
120 // If we have no sigtimedwait implementation, we have to retrieve signal data by establishing a
121 // signal handler.
122
123 // We need to declare and define a non-static data variable, "siginfo_p", in this header, without
124 // violating the "one definition rule". The only way to do that is via a template, even though we
125 // don't otherwise need a template here:
126 template <typename T = decltype(nullptr)> class sig_capture_templ
127 {
128     public:
129     static siginfo_t * siginfo_p;
130
131     static void signalHandler(int signo, siginfo_t *siginfo, void *v)
132     {
133         *siginfo_p = *siginfo;
134     }
135 };
136 template <typename T> siginfo_t * sig_capture_templ<T>::siginfo_p = nullptr;
137
138 using sig_capture = sig_capture_templ<>;
139
140 inline void prepare_signal(int signo)
141 {
142     struct sigaction the_action;
143     the_action.sa_sigaction = sig_capture::signalHandler;
144     the_action.sa_flags = SA_SIGINFO;
145     sigfillset(&the_action.sa_mask);
146
147     sigaction(signo, &the_action, nullptr);
148 }
149
150 inline void unprep_signal(int signo)
151 {
152     signal(signo, SIG_DFL);
153 }
154
155 inline bool get_siginfo(int signo, siginfo_t *siginfo)
156 {
157     sig_capture::siginfo_p = siginfo;
158
159     sigset_t mask;
160     sigfillset(&mask);
161     sigdelset(&mask, signo);
162     sigsuspend(&mask);
163     return true;
164 }
165
166 #endif
167
168 template <class Base> class kqueue_loop : public Base
169 {
170     int kqfd; // kqueue fd
171
172     // The kqueue signal reporting mechanism *coexists* with the regular signal
173     // delivery mechanism without having any connection to it. Whereas regular signals can be
174     // queued (especially "realtime" signals, via sigqueue()), kqueue just maintains a counter
175     // of delivery attempts and clears this when we read the event. What this means is that
176     // kqueue won't necessarily tell us if signals are pending, in the case that:
177     //  1) it already reported the attempted signal delivery and
178     //  2) more than instance one of the same signal was pending at that time and
179     //  3) no more deliveries of the same signal have been attempted in the meantime.
180     // Also, kqueue won't tell us about signals that were pending at the time the signal filter
181     // was added. Finally, because pending signals can be merged, the count of delivery attempts
182     // provided by kqueue does not necessarily match the number of signals actually pending.
183     //
184     // Note that POSIX allows for multiple instances of a signal to be pending even on systems
185     // that don't support queueing of signals.
186     //
187     // Ultimately, this means we need to check for pending signals independently of what kqueue
188     // tells us.
189
190     // Base contains:
191     //   lock - a lock that can be used to protect internal structure.
192     //          receive*() methods will be called with lock held.
193     //   receive_signal(sigdata_t &, user *) noexcept
194     //   receive_fd_event(fd_r, user *, int flags) noexcept
195     
196     using sigdata_t = kqueue_traits::sigdata_t;
197     using fd_r = typename kqueue_traits::fd_r;
198     
199     // The flag to specify poll() semantics for regular file readiness: that is, we want
200     // ready-for-read to be returned even at end of file:
201 #if defined(NOTE_FILE_POLL)
202     // FreeBSD:
203     constexpr static int POLL_SEMANTICS = NOTE_FILE_POLL;
204 #else
205     // Note that macOS has an "EV_POLL" defined that looks like it should give poll semantics
206     // when passed as a flag. However, it is filtered at the syscall entry so we cannot use it in
207     // kqueue. (The kernel uses it internally to implement poll()).
208     constexpr static int POLL_SEMANTICS = 0;
209 #endif
210
211     void process_events(struct kevent *events, int r)
212     {
213         std::lock_guard<decltype(Base::lock)> guard(Base::lock);
214         
215         for (int i = 0; i < r; i++) {
216             if (events[i].filter == EVFILT_SIGNAL) {
217                 bool reenable = pull_signal(events[i].ident, events[i].udata);
218                 events[i].flags = reenable ? EV_ENABLE : EV_DISABLE;
219             }
220             else if (events[i].filter == EVFILT_READ || events[i].filter == EVFILT_WRITE) {
221                 int flags = events[i].filter == EVFILT_READ ? IN_EVENTS : OUT_EVENTS;
222                 auto r = Base::receive_fd_event(*this, fd_r(events[i].ident), events[i].udata, flags);
223                 if (std::get<0>(r) == 0) {
224                     // we use EV_CLEAR to clear the EOF status of fifos/pipes (and wait for
225                     // another connection).
226                     events[i].flags = EV_DISABLE | EV_CLEAR;
227                 }
228                 else {
229                     events[i].flags = EV_ENABLE;
230                 }
231             }
232             else {
233                 events[i].flags = EV_DISABLE;
234             }
235         }
236         
237         // Now we disable all received events, to simulate EV_DISPATCH:
238         kevent(kqfd, events, r, nullptr, 0, nullptr);
239     }
240     
241     // Pull a signal from pending, and report it, until it is no longer pending or the watch
242     // should be disabled. Call with lock held.
243     // Returns:  true if watcher should be enabled, false if disabled.
244     bool pull_signal(int signo, void *userdata)
245     {
246         bool enable_filt = true;
247         sigdata_t siginfo;
248
249 #if _POSIX_REALTIME_SIGNALS > 0
250         struct timespec timeout = {0, 0};
251         sigset_t sigw_mask;
252         sigemptyset(&sigw_mask);
253         sigaddset(&sigw_mask, signo);
254         int rsigno = sigtimedwait(&sigw_mask, &siginfo.info, &timeout);
255         while (rsigno > 0) {
256             if (Base::receive_signal(*this, siginfo, userdata)) {
257                 enable_filt = false;
258                 break;
259             }
260             rsigno = sigtimedwait(&sigw_mask, &siginfo.info, &timeout);
261         }
262 #else
263         // we have no sigtimedwait.
264         sigset_t pending_sigs;
265         sigpending(&pending_sigs);
266         while (sigismember(&pending_sigs, signo)) {
267             get_siginfo(signo, &siginfo.info);
268             if (Base::receive_signal(*this, siginfo, userdata)) {
269                 enable_filt = false;
270                 break;
271             }
272             sigpending(&pending_sigs);
273         }
274 #endif
275         return enable_filt;
276     }
277
278     public:
279     
280     /**
281      * kqueue_loop constructor.
282      *
283      * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised.
284      */
285     kqueue_loop()
286     {
287         kqfd = kqueue();
288         if (kqfd == -1) {
289             throw std::system_error(errno, std::system_category());
290         }
291         Base::init(this);
292     }
293     
294     ~kqueue_loop()
295     {
296         close(kqfd);
297     }
298     
299     void set_filter_enabled(short filterType, uintptr_t ident, void *udata, bool enable)
300     {
301         // Note, on OpenBSD enabling or disabling filter will not alter the filter parameters (udata etc);
302         // on OS X however, it will. Therefore we set udata here (to the same value as it was originally
303         // set) in order to work correctly on both kernels.
304         struct kevent kev;
305         int fflags = (filterType == EVFILT_READ) ? POLL_SEMANTICS : 0;
306         EV_SET(&kev, ident, filterType, enable ? EV_ENABLE : EV_DISABLE, fflags, 0, udata);
307         kevent(kqfd, &kev, 1, nullptr, 0, nullptr);
308     }
309     
310     void remove_filter(short filterType, uintptr_t ident)
311     {
312         struct kevent kev;
313         EV_SET(&kev, ident, filterType, EV_DELETE, 0, 0, 0);
314         kevent(kqfd, &kev, 1, nullptr, 0, nullptr);    
315     }
316     
317     //        fd:  file descriptor to watch
318     //  userdata:  data to associate with descriptor
319     //     flags:  IN_EVENTS | OUT_EVENTS | ONE_SHOT
320     //             (only one of IN_EVENTS/OUT_EVENTS can be specified)
321     // soft_fail:  true if unsupported file descriptors should fail by returning false instead
322     //             of throwing an exception
323     // returns: true on success; false if file descriptor type isn't supported and emulate == true
324     // throws:  std::system_error or std::bad_alloc on failure
325     bool add_fd_watch(int fd, void *userdata, int flags, bool enabled = true, bool emulate = false)
326     {
327         short filter = (flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE;
328
329         if (filter == EVFILT_READ && POLL_SEMANTICS == 0 && emulate) {
330             // We can't request poll semantics, so check for regular file:
331             struct stat statbuf;
332             if (fstat(fd, &statbuf) == -1) {
333                 throw new std::system_error(errno, std::system_category());
334             }
335             if ((statbuf.st_mode & S_IFMT) == S_IFREG) {
336                 // Regular file: emulation required
337                 return false;
338             }
339         }
340
341         int fflags = (filter == EVFILT_READ) ? POLL_SEMANTICS : 0;
342
343         struct kevent kev;
344         EV_SET(&kev, fd, filter, EV_ADD | (enabled ? 0 : EV_DISABLE), fflags, 0, userdata);
345         if (kevent(kqfd, &kev, 1, nullptr, 0, nullptr) == -1) {
346             // Note that kqueue supports EVFILT_READ on regular file fd's, but not EVFILT_WRITE.
347             if (filter == EVFILT_WRITE && errno == EINVAL && emulate) {
348                 return false; // emulate
349             }
350             throw new std::system_error(errno, std::system_category());
351         }
352         return true;
353     }
354
355     // returns: 0 on success
356     //          IN_EVENTS  if in watch requires emulation
357     //          OUT_EVENTS if out watch requires emulation
358     int add_bidi_fd_watch(int fd, void *userdata, int flags, bool emulate = false)
359     {
360 #ifdef EV_RECEIPT
361         struct kevent kev[2];
362         struct kevent kev_r[2];
363         short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
364         short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE) | EV_RECEIPT;
365         EV_SET(&kev[0], fd, EVFILT_READ, rflags, POLL_SEMANTICS, 0, userdata);
366         EV_SET(&kev[1], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
367
368         int r = kevent(kqfd, kev, 2, kev_r, 2, nullptr);
369
370         if (r == -1) {
371             throw new std::system_error(errno, std::system_category());
372         }
373
374         // Some possibilities:
375         // - both ends failed. We'll throw an error rather than allowing emulation.
376         // - read watch failed, write succeeded : should not happen.
377         // - read watch added, write failed: if emulate == true, succeed;
378         //                                   if emulate == false, remove read and fail.
379
380         if (kev_r[0].data != 0) {
381             // read failed
382             throw new std::system_error(kev_r[0].data, std::system_category());
383         }
384
385         if (kev_r[1].data != 0) {
386             if (emulate) {
387                 // We can emulate, but, do we have correct semantics?
388                 if (POLL_SEMANTICS != 0) {
389                     return OUT_EVENTS;
390                 }
391
392                 // if we can't get poll semantics, emulate for read as well:
393                 // first remove read watch:
394                 EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
395                 kevent(kqfd, kev, 1, nullptr, 0, nullptr);
396                 return IN_EVENTS | OUT_EVENTS;
397             }
398             // remove read watch
399             EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
400             kevent(kqfd, kev, 1, nullptr, 0, nullptr);
401             // throw exception
402             throw new std::system_error(kev_r[1].data, std::system_category());
403         }
404
405         return 0;
406 #else
407         // OpenBSD doesn't have EV_RECEIPT: install the watches one at a time
408         struct kevent kev[1];
409
410         short rflags = EV_ADD | ((flags & IN_EVENTS) ? 0 : EV_DISABLE);
411         short wflags = EV_ADD | ((flags & OUT_EVENTS) ? 0 : EV_DISABLE);
412         EV_SET(&kev[0], fd, EVFILT_READ, rflags, 0, 0, userdata);
413
414         int r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
415
416         if (r == -1) {
417             throw new std::system_error(errno, std::system_category());
418         }
419
420         EV_SET(&kev[0], fd, EVFILT_WRITE, wflags, 0, 0, userdata);
421
422         r = kevent(kqfd, kev, 1, nullptr, 0, nullptr);
423
424         if (r == -1) {
425             if (emulate) {
426                 return OUT_EVENTS;
427             }
428             // remove read watch
429             EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, userdata);
430             kevent(kqfd, kev, 1, nullptr, 0, nullptr);
431             // throw exception
432             throw new std::system_error(errno, std::system_category());
433         }
434
435         return 0;
436 #endif
437     }
438     
439     // flags specifies which watch to remove; ignored if the loop doesn't support
440     // separate read/write watches.
441     void remove_fd_watch(int fd, int flags)
442     {        
443         remove_filter((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd);
444     }
445     
446     void remove_fd_watch_nolock(int fd, int flags)
447     {
448         remove_fd_watch(fd, flags);
449     }
450
451     void remove_bidi_fd_watch(int fd) noexcept
452     {
453         struct kevent kev[2];
454         EV_SET(&kev[0], fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
455         EV_SET(&kev[1], fd, EVFILT_WRITE, EV_DELETE, 0, 0, nullptr);
456         
457         kevent(kqfd, kev, 2, nullptr, 0, nullptr);
458     }
459     
460     void enable_fd_watch(int fd, void *userdata, int flags)
461     {
462         set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, userdata, true);
463     }
464     
465     void enable_fd_watch_nolock(int fd, void *userdata, int flags)
466     {
467         enable_fd_watch(fd, userdata, flags);
468     }
469     
470     void disable_fd_watch(int fd, int flags)
471     {
472         set_filter_enabled((flags & IN_EVENTS) ? EVFILT_READ : EVFILT_WRITE, fd, nullptr, false);
473     }
474     
475     void disable_fd_watch_nolock(int fd, int flags)
476     {
477         disable_fd_watch(fd, flags);
478     }
479
480     // Note signal should be masked before call.
481     void add_signal_watch(int signo, void *userdata)
482     {
483         std::lock_guard<decltype(Base::lock)> guard(Base::lock);
484         add_signal_watch_nolock(signo, userdata);
485     }
486
487     // Note signal should be masked before call.
488     void add_signal_watch_nolock(int signo, void *userdata)
489     {
490         prepare_signal(signo);
491
492         // We need to register the filter with the kqueue early, to avoid a race where we miss
493         // signals:
494         struct kevent evt;
495         EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ADD | EV_DISABLE, 0, 0, userdata);
496         if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) {
497             throw new std::system_error(errno, std::system_category());
498         }
499         // TODO use EV_DISPATCH if available (not on OpenBSD/OS X)
500         
501         // The signal might be pending already but won't be reported by kqueue in that case. We can queue
502         // it immediately (note that it might be pending multiple times, so we need to re-check once signal
503         // processing finishes if it is re-armed).
504
505         bool enable_filt = pull_signal(signo, userdata);
506
507         if (enable_filt) {
508             evt.flags = EV_ENABLE;
509             if (kevent(kqfd, &evt, 1, nullptr, 0, nullptr) == -1) {
510                 throw new std::system_error(errno, std::system_category());
511             }
512         }
513     }
514     
515     // Note, called with lock held:
516     void rearm_signal_watch_nolock(int signo, void *userdata) noexcept
517     {
518         if (pull_signal(signo, userdata)) {
519             struct kevent evt;
520             EV_SET(&evt, signo, EVFILT_SIGNAL, EV_ENABLE, 0, 0, userdata);
521             // TODO use EV_DISPATCH if available (not on OpenBSD)
522
523             kevent(kqfd, &evt, 1, nullptr, 0, nullptr);
524         }
525     }
526     
527     void remove_signal_watch_nolock(int signo) noexcept
528     {
529         unprep_signal(signo);
530         
531         struct kevent evt;
532         EV_SET(&evt, signo, EVFILT_SIGNAL, EV_DELETE, 0, 0, 0);
533         
534         kevent(kqfd, &evt, 1, nullptr, 0, nullptr);
535     }
536
537     void remove_signal_watch(int signo) noexcept
538     {
539         std::lock_guard<decltype(Base::lock)> guard(Base::lock);
540         remove_signal_watch_nolock(signo);
541     }
542
543     public:
544
545     // If events are pending, process an unspecified number of them.
546     // If no events are pending, wait until one event is received and
547     // process this event (and possibly any other events received
548     // simultaneously).
549     // If processing an event removes a watch, there is a possibility
550     // that the watched event will still be reported (if it has
551     // occurred) before pull_events() returns.
552     //
553     //  do_wait - if false, returns immediately if no events are
554     //            pending.
555     void pull_events(bool do_wait)
556     {
557         struct kevent events[16];
558         struct timespec ts;
559
560         ts.tv_sec = 0;
561         ts.tv_nsec = 0;
562         int r = kevent(kqfd, nullptr, 0, events, 16, do_wait ? nullptr : &ts);
563         if (r == -1 || r == 0) {
564             // signal or no events
565             return;
566         }
567         
568         do {
569             process_events(events, r);
570             r = kevent(kqfd, nullptr, 0, events, 16, &ts);
571         } while (r > 0);
572     }
573 };
574
575 } // end namespace