Fix mistakes in cptest asserts
[oweals/dinit.git] / src / dasynq / dasynq.h
1 #ifndef DASYNQ_H_INCLUDED
2 #define DASYNQ_H_INCLUDED
3
4 #include "dasynq-config.h"
5
6 #include "dasynq-flags.h"
7 #include "dasynq-stableheap.h"
8 #include "dasynq-interrupt.h"
9 #include "dasynq-util.h"
10
11 // Dasynq uses a "mix-in" pattern to produce an event loop implementation incorporating selectable
12 // implementations of various components (main backend, timers, child process watch mechanism etc). In C++
13 // this can be achieved by a template for some component which extends its own type parameter:
14 //
15 //     template <typename Base> class X : public B { .... }
16 //
17 // (Note that in a sense this is actually the opposite of the so-called "Curiously Recurring Template"
18 // pattern, which can be used to achieve a similar goal). We can chain several such components together to
19 // "mix in" the functionality of each into the final class, eg:
20 //
21 //     template <typename T> using loop_t =
22 //         epoll_loop<interrupt_channel<timer_fd_events<child_proc_events<T>>>>;
23 //
24 // (which defines an alias template "loop_t", whose implementation will use the epoll backend, a standard
25 // interrupt channel implementation, a timerfd-based timer implementation, and the standard child process
26 // watch implementation). We sometimes need the base class to be able to call derived-class members: to do
27 // this we pass a reference to the derived instance into a template member function in the base, for example
28 // the "init" function:
29 //
30 //     template <typename T> void init(T *derived)
31 //     {
32 //         // can call method on derived:
33 //         derived->add_listener();
34 //         // chain to next class:
35 //         Base::init(derived);
36 //     }
37 //
38 // The 'loop_t' defined above is a template for a usable backend mechanism for the event_loop template
39 // class. At the base all this is the event_dispatch class, defined below, which receives event
40 // notifications and inserts them into a queue for processing. The event_loop class, also below, wraps this
41 // (via composition) in an interface which can be used to register/de-register/enable/disable event
42 // watchers, and which can process the queued events by calling the watcher callbacks. The event_loop class
43 // also provides some synchronisation to ensure thread-safety, and abstracts away some differences between
44 // backends.
45 //
46 // The differences are exposed as traits, partly via a separate traits class (loop_traits_t as defined
47 // below, which contains the "main" traits, particularly the sigdata_t, fd_r and fd_s types). Note that the
48 // event_dispatch class exposes the loop traits as traits_t, and these are then potentially augmented at
49 // each stage of the mechanism inheritance chain (i.e. the final traits are exposed as
50 // `loop_t<event_dispatch>::traits_t'.
51 //
52 // The trait members are:
53 //   sigdata_t  - a wrapper for the siginfo_t type or equivalent used to pass signal parameters
54 //   fd_r       - a file descriptor wrapper, if the backend is able to retrieve the file descriptor when
55 //                it receives an fd event. Not all backends can do this.
56 //   fd_s       - a file descriptor storage wrapper. If the backend can retrieve file descriptors, this
57 //                will be empty (and ideally zero-size), otherwise it stores a file descriptor.
58 //                With an fd_r and fd_s instance you can always retrieve the file descriptor:
59 //                `fdr.get_fd(fds)' will return it.
60 //   has_bidi_fd_watch
61 //              - boolean indicating whether a single watch can support watching for both input and output
62 //                events simultaneously
63 //   has_separate_rw_fd_watches
64 //              - boolean indicating whether it is possible to add separate input and output watches for the
65 //                same fd. Either this or has_bidi_fd_watch must be true.
66 //   interrupt_after_fd_add
67 //              - boolean indicating if a loop interrupt must be forced after adding/enabling an fd watch.
68 //   interrupt_after_signal_add
69 //              - boolean indicating if a loop interrupt must be forced after adding or enabling a signal
70 //                watch.
71 //   supports_non_oneshot_fd
72 //              - boolean; if true, event_dispatch can arm an fd watch without ONESHOT and returning zero
73 //                events from receive_fd_event (the event notification function) will leave the descriptor
74 //                armed. If false, all fd watches are effectively ONESHOT (they can be re-armed immediately
75 //                after delivery by returning an appropriate event flag mask).
76 //   full_timer_support
77 //              - boolean indicating that the monotonic and system clocks are actually different clocks and
78 //                that timers against the system clock will work correctly if the system clock time is
79 //                adjusted. If false, the monotic clock may not be present at all (monotonic clock will map
80 //                to system clock), and timers against either clock are not guaranteed to work correctly if
81 //                the system clock is adjusted.
82
83 #if DASYNQ_HAVE_EPOLL <= 0
84 #if _POSIX_TIMERS > 0
85 #include "dasynq-posixtimer.h"
86 namespace dasynq {
87     template <typename T, bool provide_mono_timer = true> using timer_events = posix_timer_events<T, provide_mono_timer>;
88 }
89 #else
90 #include "dasynq-itimer.h"
91 namespace dasynq {
92     template <typename T, bool provide_mono_timer = true> using timer_events = itimer_events<T, provide_mono_timer>;
93 }
94 #endif
95 #endif
96
97 #if DASYNQ_HAVE_KQUEUE
98 #if DASYNQ_KQUEUE_MACOS_WORKAROUND
99 #include "dasynq-kqueue-macos.h"
100 #include "dasynq-childproc.h"
101 namespace dasynq {
102     template <typename T> using loop_t = macos_kqueue_loop<timer_events<child_proc_events<interrupt_channel<T>>, false>>;
103     using loop_traits_t = macos_kqueue_traits;
104 }
105 #else
106 #include "dasynq-kqueue.h"
107 #include "dasynq-childproc.h"
108 namespace dasynq {
109     template <typename T> using loop_t = kqueue_loop<timer_events<child_proc_events<interrupt_channel<T>>, false>>;
110     using loop_traits_t = kqueue_traits;
111 }
112 #endif
113 #elif DASYNQ_HAVE_EPOLL
114 #include "dasynq-epoll.h"
115 #include "dasynq-timerfd.h"
116 #include "dasynq-childproc.h"
117 namespace dasynq {
118     template <typename T> using loop_t = epoll_loop<interrupt_channel<timer_fd_events<child_proc_events<T>>>>;
119     using loop_traits_t = epoll_traits;
120 }
121 #else
122 #include "dasynq-childproc.h"
123 #if DASYNQ_HAVE_PSELECT
124 #include "dasynq-pselect.h"
125 namespace dasynq {
126     template <typename T> using loop_t = pselect_events<timer_events<interrupt_channel<child_proc_events<T>>, false>>;
127     using loop_traits_t = select_traits;
128 }
129 #else
130 #include "dasynq-select.h"
131 namespace dasynq {
132     template <typename T> using loop_t = select_events<timer_events<interrupt_channel<child_proc_events<T>>, false>>;
133     using loop_traits_t = select_traits;
134 }
135 #endif
136 #endif
137
138 #include <atomic>
139 #include <condition_variable>
140 #include <cstdint>
141 #include <cstddef>
142 #include <system_error>
143
144 #include <unistd.h>
145 #include <fcntl.h>
146
147 #include "dasynq-mutex.h"
148
149 #include "dasynq-basewatchers.h"
150
151 namespace dasynq {
152
153 /**
154  * Values for rearm/disarm return from event handlers
155  */
156 enum class rearm
157 {
158     /** Re-arm the event watcher so that it receives further events */
159     REARM,
160     /** Disarm the event watcher so that it receives no further events, until it is re-armed explicitly */
161     DISARM,
162     /** Leave in current armed/disarmed state */
163     NOOP,
164     /** Remove the event watcher (and call "removed" callback) */
165     REMOVE,
166     /** The watcher has been removed - don't touch it! */
167     REMOVED,
168     /** RE-queue the watcher to have its notification called again */
169     REQUEUE
170 };
171
172 namespace dprivate {
173
174     // Classes for implementing a fair(ish) wait queue.
175     // A queue node can be signalled when it reaches the head of
176     // the queue.
177
178     template <typename T_Mutex> class waitqueue;
179     template <typename T_Mutex> class waitqueue_node;
180
181     // Select an appropriate condition variable type for a mutex:
182     // condition_variable if mutex is std::mutex, or condition_variable_any
183     // otherwise.
184     template <class T_Mutex> class condvar_selector;
185
186     template <> class condvar_selector<std::mutex>
187     {
188         public:
189         typedef std::condition_variable condvar;
190     };
191
192     template <class T_Mutex> class condvar_selector
193     {
194         public:
195         typedef std::condition_variable_any condvar;
196     };
197
198     // For a single-threaded loop, the waitqueue is a no-op:
199     template <> class waitqueue_node<null_mutex>
200     {
201         // Specialised waitqueue_node for null_mutex.
202         friend class waitqueue<null_mutex>;
203         
204         public:
205         void wait(std::unique_lock<null_mutex> &ul) { }
206         void signal() { }
207         
208         DASYNQ_EMPTY_BODY;
209     };
210
211     template <typename T_Mutex> class waitqueue_node
212     {
213         typename condvar_selector<T_Mutex>::condvar condvar;
214         friend class waitqueue<T_Mutex>;
215
216         // ptr to next node in queue, set to null when added to queue tail:
217         waitqueue_node * next;
218         
219         public:
220         void signal()
221         {
222             condvar.notify_one();
223         }
224         
225         void wait(std::unique_lock<T_Mutex> &mutex_lock)
226         {
227             condvar.wait(mutex_lock);
228         }
229     };
230
231     template <> class waitqueue<null_mutex>
232     {
233         public:
234         // remove current head of queue, return new head:
235         waitqueue_node<null_mutex> * unqueue()
236         {
237             return nullptr;
238         }
239         
240         waitqueue_node<null_mutex> * get_head()
241         {
242             return nullptr;
243         }
244         
245         waitqueue_node<null_mutex> * get_second()
246         {
247             return nullptr;
248         }
249
250         bool check_head(waitqueue_node<null_mutex> &node)
251         {
252             return true;
253         }
254         
255         bool is_empty()
256         {
257             return true;
258         }
259         
260         void queue(waitqueue_node<null_mutex> *node)
261         {
262         }
263     };
264
265     template <typename T_Mutex> class waitqueue
266     {
267         waitqueue_node<T_Mutex> * tail = nullptr;
268         waitqueue_node<T_Mutex> * head = nullptr;
269
270         public:
271         // remove current head of queue, return new head:
272         waitqueue_node<T_Mutex> * unqueue()
273         {
274             head = head->next;
275             if (head == nullptr) {
276                 tail = nullptr;
277             }
278             return head;
279         }
280         
281         waitqueue_node<T_Mutex> * get_head()
282         {
283             return head;
284         }
285         
286         waitqueue_node<T_Mutex> * get_second()
287         {
288             return head->next;
289         }
290
291         bool check_head(waitqueue_node<T_Mutex> &node)
292         {
293             return head == &node;
294         }
295         
296         bool is_empty()
297         {
298             return head == nullptr;
299         }
300         
301         void queue(waitqueue_node<T_Mutex> *node)
302         {
303             node->next = nullptr;
304             if (tail) {
305                 tail->next = node;
306             }
307             else {
308                 head = node;
309             }
310             tail = node;
311         }
312     };
313     
314     // friend of event_loop for giving access to various private members
315     class loop_access {
316         public:
317         template <typename Loop>
318         static typename Loop::mutex_t &get_base_lock(Loop &loop) noexcept
319         {
320             return loop.get_base_lock();
321         }
322
323         template <typename Loop>
324         static rearm process_fd_rearm(Loop &loop, typename Loop::base_fd_watcher *bfw,
325                 rearm rearm_type) noexcept
326         {
327             return loop.process_fd_rearm(bfw, rearm_type);
328         }
329
330         template <typename Loop>
331         static rearm process_primary_rearm(Loop &loop, typename Loop::base_bidi_fd_watcher *bdfw,
332                 rearm rearm_type) noexcept
333         {
334             return loop.process_primary_rearm(bdfw, rearm_type);
335         }
336
337         template <typename Loop>
338         static rearm process_secondary_rearm(Loop &loop, typename Loop::base_bidi_fd_watcher * bdfw,
339                 base_watcher * outw, rearm rearm_type) noexcept
340         {
341             return loop.process_secondary_rearm(bdfw, outw, rearm_type);
342         }
343
344         template <typename Loop>
345         static void process_signal_rearm(Loop &loop, typename Loop::base_signal_watcher * bsw,
346                 rearm rearm_type) noexcept
347         {
348             loop.process_signal_rearm(bsw, rearm_type);
349         }
350
351         template <typename Loop>
352         static void process_child_watch_rearm(Loop &loop, typename Loop::base_child_watcher *bcw,
353                 rearm rearm_type) noexcept
354         {
355             loop.process_child_watch_rearm(bcw, rearm_type);
356         }
357
358         template <typename Loop>
359         static void process_timer_rearm(Loop &loop, typename Loop::base_timer_watcher *btw,
360                 rearm rearm_type) noexcept
361         {
362             loop.process_timer_rearm(btw, rearm_type);
363         }
364
365         template <typename Loop>
366         static void requeue_watcher(Loop &loop, base_watcher *watcher) noexcept
367         {
368             loop.requeue_watcher(watcher);
369         }
370
371         template <typename Loop>
372         static void release_watcher(Loop &loop, base_watcher *watcher) noexcept
373         {
374             loop.release_watcher(watcher);
375         }
376     };
377
378     // Do standard post-dispatch processing for a watcher. This handles the case of removing or
379     // re-queueing watchers depending on the rearm type. This is called from the individual
380     // watcher dispatch functions to handle REMOVE or REQUEUE re-arm values.
381     template <typename Loop> void post_dispatch(Loop &loop, base_watcher *watcher, rearm rearm_type)
382     {
383         if (rearm_type == rearm::REMOVE) {
384             loop_access::get_base_lock(loop).unlock();
385             loop_access::release_watcher(loop, watcher);
386             watcher->watch_removed();
387             loop_access::get_base_lock(loop).lock();
388         }
389         else if (rearm_type == rearm::REQUEUE) {
390             loop_access::requeue_watcher(loop, watcher);
391         }
392     }
393
394     // Post-dispatch handling for bidi fd watchers.
395     template <typename Loop> void post_dispatch(Loop &loop, bidi_fd_watcher<Loop> *bdfd_watcher,
396             base_watcher *out_watcher, rearm rearm_type)
397     {
398         base_watcher *watcher = (base_watcher *)bdfd_watcher;
399         if (rearm_type == rearm::REMOVE) {
400             loop_access::get_base_lock(loop).unlock();
401             loop_access::release_watcher(loop, watcher);
402             loop_access::release_watcher(loop, out_watcher);
403             watcher->watch_removed();
404             loop_access::get_base_lock(loop).lock();
405         }
406         else if (rearm_type == rearm::REQUEUE) {
407             loop_access::requeue_watcher(loop, watcher);
408         }
409     }
410
411     // The event_dispatch class serves as the base class (mixin) for the backend mechanism. It
412     // mostly manages queing and dequeing of events and maintains/owns the relevant data
413     // structures, including a mutex lock.
414     //
415     // The backend mechanism should call one of the receiveXXX functions to notify of an event
416     // received. The watcher will then be queued.
417     //
418     // In general the functions should be called with lock held. In practice this means that the
419     // event loop backend implementations (that deposit received events here) must obtain the
420     // lock; they are also free to use it to protect their own internal data structures.
421     template <typename Traits, typename LoopTraits> class event_dispatch
422     {
423         friend class dasynq::event_loop<typename LoopTraits::mutex_t, LoopTraits>;;
424
425         public:
426         using mutex_t = typename LoopTraits::mutex_t;
427         using traits_t = Traits;
428
429         private:
430
431         // queue data structure/pointer
432         prio_queue event_queue;
433         
434         using base_signal_watcher = dprivate::base_signal_watcher<typename traits_t::sigdata_t>;
435         using base_child_watcher = dprivate::base_child_watcher;
436         using base_timer_watcher = dprivate::base_timer_watcher;
437         
438         // Add a watcher into the queueing system (but don't queue it). Call with lock held.
439         //   may throw: std::bad_alloc
440         void prepare_watcher(base_watcher *bwatcher)
441         {
442             allocate_handle(event_queue, bwatcher->heap_handle, bwatcher);
443         }
444         
445         void queue_watcher(base_watcher *bwatcher) noexcept
446         {
447             event_queue.insert(bwatcher->heap_handle, bwatcher->priority);
448         }
449         
450         void dequeue_watcher(base_watcher *bwatcher) noexcept
451         {
452             if (event_queue.is_queued(bwatcher->heap_handle)) {
453                 event_queue.remove(bwatcher->heap_handle);
454             }
455         }
456
457         // Remove watcher from the queueing system
458         void release_watcher(base_watcher *bwatcher) noexcept
459         {
460             event_queue.deallocate(bwatcher->heap_handle);
461         }
462         
463         protected:
464         mutex_t lock;
465
466         template <typename T> void init(T *loop) noexcept { }
467         
468         void sigmaskf(int how, const sigset_t *set, sigset_t *oset)
469         {
470             LoopTraits::sigmaskf(how, set, oset);
471         }
472
473         // Receive a signal; return true to disable signal watch or false to leave enabled.
474         // Called with lock held.
475         template <typename T>
476         bool receive_signal(T &loop_mech, typename Traits::sigdata_t & siginfo, void * userdata) noexcept
477         {
478             base_signal_watcher * bwatcher = static_cast<base_signal_watcher *>(userdata);
479             bwatcher->siginfo = siginfo;
480             queue_watcher(bwatcher);
481             return true;
482         }
483         
484         // Receive fd event delivered from backend mechansim. Returns the desired watch mask, as per
485         // set_fd_enabled, which can be used to leave the watch disabled, re-enable it or re-enable
486         // one direction of a bi-directional watcher.
487         template <typename T>
488         std::tuple<int, typename Traits::fd_s> receive_fd_event(T &loop_mech, typename Traits::fd_r fd_r,
489                 void * userdata, int flags) noexcept
490         {
491             base_fd_watcher * bfdw = static_cast<base_fd_watcher *>(userdata);
492             
493             bfdw->event_flags |= flags;
494             typename Traits::fd_s watch_fd_s {bfdw->watch_fd};
495             
496             base_watcher * bwatcher = bfdw;
497             
498             bool is_multi_watch = bfdw->watch_flags & multi_watch;
499             if (is_multi_watch) {                
500                 base_bidi_fd_watcher *bbdw = static_cast<base_bidi_fd_watcher *>(bwatcher);
501                 bbdw->watch_flags &= ~flags;
502                 if ((flags & IN_EVENTS) && (flags & OUT_EVENTS)) {
503                     // Queue the secondary watcher first:
504                     queue_watcher(&bbdw->out_watcher);
505                 }
506                 else if (flags & OUT_EVENTS) {                
507                     // Use the secondary watcher for queueing:
508                     bwatcher = &(bbdw->out_watcher);
509                 }
510             }
511
512             queue_watcher(bwatcher);
513             
514             if (is_multi_watch && ! traits_t::has_separate_rw_fd_watches) {
515                 // If this is a bidirectional fd-watch, it has been disabled in *both* directions
516                 // as the event was delivered. However, the other direction should not be disabled
517                 // yet, so we need to re-enable:
518                 int in_out_mask = IN_EVENTS | OUT_EVENTS;
519                 if ((bfdw->watch_flags & in_out_mask) != 0) {
520                     // We need to re-enable the other channel now:
521                     return std::make_tuple((bfdw->watch_flags & in_out_mask) | ONE_SHOT, watch_fd_s);
522                     // We are the polling thread: don't need to interrupt polling, even if it would
523                     // normally be required.
524                 }
525             }
526
527             return std::make_tuple(0, watch_fd_s);
528         }
529         
530         // Child process terminated. Called with both the main lock and the reaper lock held.
531         void receive_child_stat(pid_t child, int status, void * userdata) noexcept
532         {
533             base_child_watcher * watcher = static_cast<base_child_watcher *>(userdata);
534             watcher->child_status = status;
535             watcher->child_termd = true;
536             queue_watcher(watcher);
537         }
538         
539         void receive_timer_expiry(timer_handle_t & timer_handle, void * userdata, int intervals) noexcept
540         {
541             base_timer_watcher * watcher = static_cast<base_timer_watcher *>(userdata);
542             watcher->intervals += intervals;
543             queue_watcher(watcher);
544         }
545         
546         // Pull a single event from the queue; returns nullptr if the queue is empty.
547         // Call with lock held.
548         base_watcher * pull_event() noexcept
549         {
550             if (event_queue.empty()) {
551                 return nullptr;
552             }
553             
554             auto & rhndl = event_queue.get_root();
555             base_watcher *r = dprivate::get_watcher(event_queue, rhndl);
556             event_queue.pull_root();
557             return r;
558         }
559         
560         // Queue a watcher for removal, or issue "removed" callback to it.
561         // Call with lock free.
562         void issue_delete(base_watcher *watcher) noexcept
563         {
564             // This is only called when the attention lock is held, so if the watcher is not
565             // active/queued now, it cannot become active (and will not be reported with an event)
566             // during execution of this function.
567             
568             lock.lock();
569             
570             if (watcher->active) {
571                 // If the watcher is active, set deleteme true; the watcher will be removed
572                 // at the end of current processing (i.e. when active is set false).
573                 watcher->deleteme = true;
574                 lock.unlock();
575             }
576             else {
577                 // Actually do the delete.
578                 dequeue_watcher(watcher);
579                 release_watcher(watcher);
580                 
581                 lock.unlock();
582                 watcher->watch_removed();
583             }
584         }
585         
586         // Queue a watcher for removal, or issue "removed" callback to it.
587         // Call with lock free.
588         void issue_delete(base_bidi_fd_watcher *watcher) noexcept
589         {
590             lock.lock();
591             
592             if (watcher->active) {
593                 watcher->deleteme = true;
594                 release_watcher(watcher);
595             }
596             else {
597                 dequeue_watcher(watcher);
598                 release_watcher(watcher);
599                 watcher->read_removed = true;
600             }
601             
602             base_watcher *secondary = &(watcher->out_watcher);
603             if (secondary->active) {
604                 secondary->deleteme = true;
605                 release_watcher(watcher);
606             }
607             else {
608                 dequeue_watcher(secondary);
609                 release_watcher(watcher);
610                 watcher->write_removed = true;
611             }
612             
613             if (watcher->read_removed && watcher->write_removed) {
614                 lock.unlock();
615                 watcher->watch_removed();
616             }
617             else {
618                 lock.unlock();
619             }
620         }
621
622         event_dispatch() {  }
623         event_dispatch(const event_dispatch &) = delete;
624     };
625
626 }
627
628 // This is the main event_loop implementation. It serves as an interface to the event loop backend (of which
629 // it maintains an internal instance). It also serialises polling the backend and provides safe deletion of
630 // watchers (see comments inline).
631 //
632 // The T_Mutex type parameter specifies the mutex type. A null_mutex can be used for a single-threaded event
633 // loop; std::mutex, or any mutex providing a compatible interface, can be used for a thread-safe event
634 // loop.
635 //
636 // The Traits type parameter specifies any required traits for the event loop.  This specifies the back-end
637 // to use (backend_t, a template) and the basic back-end traits (backend_traits_t).
638 // The default is `default_traits<T_Mutex>'.
639 //
640 template <typename T_Mutex, typename Traits>
641 class event_loop
642 {
643     using my_event_loop_t = event_loop<T_Mutex, Traits>;
644
645     friend class dprivate::fd_watcher<my_event_loop_t>;
646     friend class dprivate::bidi_fd_watcher<my_event_loop_t>;
647     friend class dprivate::signal_watcher<my_event_loop_t>;
648     friend class dprivate::child_proc_watcher<my_event_loop_t>;
649     friend class dprivate::timer<my_event_loop_t>;
650     
651     friend class dprivate::loop_access;
652
653     using backend_traits_t = typename Traits::backend_traits_t;
654
655     template <typename T> using event_dispatch = dprivate::event_dispatch<T,Traits>;
656     using dispatch_t = event_dispatch<backend_traits_t>;
657     using loop_mech_t = typename Traits::template backend_t<dispatch_t>;
658     using reaper_mutex_t = typename loop_mech_t::reaper_mutex_t;
659
660     public:
661     using traits_t = Traits;
662     using loop_traits_t = typename loop_mech_t::traits_t;
663     using mutex_t = T_Mutex;
664     
665     private:
666     template <typename T> using waitqueue = dprivate::waitqueue<T>;
667     template <typename T> using waitqueue_node = dprivate::waitqueue_node<T>;
668     using base_watcher = dprivate::base_watcher;
669     using base_signal_watcher = dprivate::base_signal_watcher<typename loop_traits_t::sigdata_t>;
670     using base_fd_watcher = dprivate::base_fd_watcher;
671     using base_bidi_fd_watcher = dprivate::base_bidi_fd_watcher;
672     using base_child_watcher = dprivate::base_child_watcher;
673     using base_timer_watcher = dprivate::base_timer_watcher;
674     using watch_type_t = dprivate::watch_type_t;
675
676     loop_mech_t loop_mech;
677
678     // There is a complex problem with most asynchronous event notification mechanisms
679     // when used in a multi-threaded environment. Generally, a file descriptor or other
680     // event type that we are watching will be associated with some data used to manage
681     // that event source. For example a web server needs to maintain information about
682     // each client connection, such as the state of the connection (what protocol version
683     // has been negotiated, etc; if a transfer is taking place, what file is being
684     // transferred etc).
685     //
686     // However, sometimes we want to remove an event source (eg webserver wants to drop
687     // a connection) and delete the associated data. The problem here is that it is
688     // difficult to be sure when it is ok to actually remove the data, since when
689     // requesting to unwatch the source in one thread it is still possible that an
690     // event from that source is just being reported to another thread (in which case
691     // the data will be needed).
692     //
693     // To solve that, we:
694     // - allow only one thread to poll for events at a time, using a lock
695     // - use the same lock to prevent polling, if we want to unwatch an event source
696     // - generate an event to interrupt any polling that may already be occurring in
697     //   another thread
698     // - mark handlers as active if they are currently executing, and
699     // - when removing an active handler, simply set a flag which causes it to be
700     //   removed once the current processing is finished, rather than removing it
701     //   immediately.
702     //
703     // In particular the lock mechanism for preventing multiple threads polling and
704     // for allowing polling to be interrupted is tricky. We can't use a simple mutex
705     // since there is significant chance that it will be highly contended and there
706     // are no guarantees that its acquisition will be fair. In particular, we don't
707     // want a thread that is trying to unwatch a source being starved while another
708     // thread polls the event source.
709     //
710     // So, we use two wait queues protected by a single mutex. The "attn_waitqueue"
711     // (attention queue) is the high-priority queue, used for threads wanting to
712     // unwatch event sources. The "wait_waitquueue" is the queue used by threads
713     // that wish to actually poll for events, while they are waiting for the main
714     // queue to become quiet.
715     // - The head of the "attn_waitqueue" is always the holder of the lock
716     // - Therefore, a poll-waiter must be moved from the wait_waitqueue to the
717     //   attn_waitqueue to actually gain the lock. This is only done if the
718     //   attn_waitqueue is otherwise empty.
719     // - The mutex only protects manipulation of the wait queues, and so should not
720     //   be highly contended.
721     //
722     // To claim the lock for a poll-wait, the procedure is:
723     //    - check if the attn_waitqueue is empty;
724     //    - if it is, insert node at the head, thus claiming the lock, and return
725     //    - otherwise, insert node in the wait_waitqueue, and wait
726     // To claim the lock for an unwatch, the procedure is:
727     //    - insert node in the attn_waitqueue
728     //    - if the node is at the head of the queue, lock is claimed; return
729     //    - otherwise, if a poll is in progress, interrupt it
730     //    - wait until our node is at the head of the attn_waitqueue
731     
732     mutex_t wait_lock;  // protects the wait/attention queues
733     bool long_poll_running = false;  // whether any thread is polling the backend (with non-zero timeout)
734     waitqueue<mutex_t> attn_waitqueue;
735     waitqueue<mutex_t> wait_waitqueue;
736     
737     mutex_t &get_base_lock() noexcept
738     {
739         return loop_mech.lock;
740     }
741     
742     reaper_mutex_t &get_reaper_lock() noexcept
743     {
744         return loop_mech.get_reaper_lock();
745     }
746
747     void register_signal(base_signal_watcher *callBack, int signo)
748     {
749         std::lock_guard<mutex_t> guard(loop_mech.lock);
750
751         loop_mech.prepare_watcher(callBack);
752         try {
753             loop_mech.add_signal_watch_nolock(signo, callBack);
754             if (backend_traits_t::interrupt_after_signal_add) {
755                 interrupt_if_necessary();
756             }
757         }
758         catch (...) {
759             loop_mech.release_watcher(callBack);
760             throw;
761         }
762     }
763     
764     void deregister(base_signal_watcher *callBack, int signo) noexcept
765     {
766         loop_mech.remove_signal_watch(signo);
767         
768         waitqueue_node<T_Mutex> qnode;
769         get_attn_lock(qnode);
770         
771         loop_mech.issue_delete(callBack);
772         
773         release_lock(qnode);
774     }
775
776     void register_fd(base_fd_watcher *callback, int fd, int eventmask, bool enabled, bool emulate = false)
777     {
778         std::lock_guard<mutex_t> guard(loop_mech.lock);
779
780         loop_mech.prepare_watcher(callback);
781
782         try {
783             if (! loop_mech.add_fd_watch(fd, callback, eventmask | ONE_SHOT, enabled, emulate)) {
784                 callback->emulatefd = true;
785                 callback->emulate_enabled = enabled;
786                 if (enabled) {
787                     callback->event_flags = eventmask & IO_EVENTS;
788                     if (eventmask & IO_EVENTS) {
789                         requeue_watcher(callback);
790                     }
791                 }
792             }
793             else if (enabled && backend_traits_t::interrupt_after_fd_add) {
794                 interrupt_if_necessary();
795             }
796         }
797         catch (...) {
798             loop_mech.release_watcher(callback);
799             throw;
800         }
801     }
802     
803     // Register a bidi fd watcher. The watch_flags should already be set to the eventmask to watch
804     // (i.e. eventmask == callback->watch_flags is a pre-condition).
805     void register_fd(base_bidi_fd_watcher *callback, int fd, int eventmask, bool emulate = false)
806     {
807         std::lock_guard<mutex_t> guard(loop_mech.lock);
808
809         loop_mech.prepare_watcher(callback);
810         try {
811             loop_mech.prepare_watcher(&callback->out_watcher);
812             try {
813                 bool do_interrupt = false;
814                 if (backend_traits_t::has_separate_rw_fd_watches) {
815                     int r = loop_mech.add_bidi_fd_watch(fd, callback, eventmask | ONE_SHOT, emulate);
816                     if (r & IN_EVENTS) {
817                         callback->emulatefd = true;
818                         if (eventmask & IN_EVENTS) {
819                             callback->watch_flags &= ~IN_EVENTS;
820                             requeue_watcher(callback);
821                         }
822                     }
823                     else if ((eventmask & IN_EVENTS) && backend_traits_t::interrupt_after_fd_add) {
824                         do_interrupt = true;
825                     }
826
827                     if (r & OUT_EVENTS) {
828                         callback->out_watcher.emulatefd = true;
829                         if (eventmask & OUT_EVENTS) {
830                             callback->watch_flags &= ~OUT_EVENTS;
831                             requeue_watcher(&callback->out_watcher);
832                         }
833                     }
834                     else if ((eventmask & OUT_EVENTS) && backend_traits_t::interrupt_after_fd_add) {
835                         do_interrupt = true;
836                     }
837                 }
838                 else {
839                     if (! loop_mech.add_fd_watch(fd, callback, eventmask | ONE_SHOT, true, emulate)) {
840                         callback->emulatefd = true;
841                         callback->out_watcher.emulatefd = true;
842                         if (eventmask & IN_EVENTS) {
843                             callback->watch_flags &= ~IN_EVENTS;
844                             requeue_watcher(callback);
845                         }
846                         if (eventmask & OUT_EVENTS) {
847                             callback->watch_flags &= ~OUT_EVENTS;
848                             requeue_watcher(&callback->out_watcher);
849                         }
850                     }
851                     else if (backend_traits_t::interrupt_after_fd_add) {
852                         do_interrupt = true;
853                     }
854                 }
855
856                 if (do_interrupt) {
857                     interrupt_if_necessary();
858                 }
859             }
860             catch (...) {
861                 loop_mech.release_watcher(&callback->out_watcher);
862                 throw;
863             }
864         }
865         catch (...) {
866             loop_mech.release_watcher(callback);
867             throw;
868         }
869     }
870     
871     void set_fd_enabled(base_watcher *watcher, int fd, int watch_flags, bool enabled) noexcept
872     {
873         if (enabled) {
874             loop_mech.enable_fd_watch(fd, watcher, watch_flags | ONE_SHOT);
875             if (backend_traits_t::interrupt_after_fd_add) {
876                 interrupt_if_necessary();
877             }
878         }
879         else {
880             loop_mech.disable_fd_watch(fd, watch_flags);
881         }
882     }
883
884     void set_fd_enabled_nolock(base_watcher *watcher, int fd, int watch_flags, bool enabled) noexcept
885     {
886         if (enabled) {
887             loop_mech.enable_fd_watch_nolock(fd, watcher, watch_flags | ONE_SHOT);
888             if (backend_traits_t::interrupt_after_fd_add) {
889                 interrupt_if_necessary();
890             }
891         }
892         else {
893             loop_mech.disable_fd_watch_nolock(fd, watch_flags);
894         }
895     }
896     
897     void deregister(base_fd_watcher *callback, int fd) noexcept
898     {
899         if (callback->emulatefd) {
900             auto & ed = (dispatch_t &) loop_mech;
901             ed.issue_delete(callback);
902             return;
903         }
904         
905         loop_mech.remove_fd_watch(fd, callback->watch_flags);
906
907         waitqueue_node<T_Mutex> qnode;
908         get_attn_lock(qnode);
909         
910         auto & ed = (dispatch_t &) loop_mech;
911         ed.issue_delete(callback);
912         
913         release_lock(qnode);        
914     }
915     
916     void deregister(base_bidi_fd_watcher *callback, int fd) noexcept
917     {
918         if (backend_traits_t::has_separate_rw_fd_watches) {
919             loop_mech.remove_bidi_fd_watch(fd);
920         }
921         else {
922             loop_mech.remove_fd_watch(fd, callback->watch_flags);
923         }
924         
925         waitqueue_node<T_Mutex> qnode;
926         get_attn_lock(qnode);
927         
928         dispatch_t & ed = (dispatch_t &) loop_mech;
929         ed.issue_delete(callback);
930         
931         release_lock(qnode);
932     }
933     
934     void reserve_child_watch(base_child_watcher *callback)
935     {
936         std::lock_guard<mutex_t> guard(loop_mech.lock);
937
938         loop_mech.prepare_watcher(callback);
939         try {
940             loop_mech.reserve_child_watch_nolock(callback->watch_handle);
941         }
942         catch (...) {
943             loop_mech.release_watcher(callback);
944             throw;
945         }
946     }
947     
948     void unreserve(base_child_watcher *callback) noexcept
949     {
950         std::lock_guard<mutex_t> guard(loop_mech.lock);
951
952         loop_mech.unreserve_child_watch(callback->watch_handle);
953         loop_mech.release_watcher(callback);
954     }
955     
956     void register_child(base_child_watcher *callback, pid_t child)
957     {
958         std::lock_guard<mutex_t> guard(loop_mech.lock);
959         
960         loop_mech.prepare_watcher(callback);
961         try {
962             loop_mech.add_child_watch_nolock(callback->watch_handle, child, callback);
963         }
964         catch (...) {
965             loop_mech.release_watcher(callback);
966             throw;
967         }
968     }
969     
970     void register_reserved_child(base_child_watcher *callback, pid_t child) noexcept
971     {
972         loop_mech.add_reserved_child_watch(callback->watch_handle, child, callback);
973     }
974
975     void register_reserved_child_nolock(base_child_watcher *callback, pid_t child) noexcept
976     {
977         loop_mech.add_reserved_child_watch_nolock(callback->watch_handle, child, callback);
978     }
979     
980     void deregister(base_child_watcher *callback, pid_t child) noexcept
981     {
982         loop_mech.remove_child_watch(callback->watch_handle);
983
984         waitqueue_node<T_Mutex> qnode;
985         get_attn_lock(qnode);
986         
987         loop_mech.issue_delete(callback);
988         
989         release_lock(qnode);
990     }
991     
992     // Stop watching a child process, but retain watch reservation so that another child can be
993     // watched without running into resource allocation issues.
994     void stop_watch(base_child_watcher *callback) noexcept
995     {
996         loop_mech.stop_child_watch(callback->watch_handle);
997     }
998
999     void register_timer(base_timer_watcher *callback, clock_type clock)
1000     {
1001         std::lock_guard<mutex_t> guard(loop_mech.lock);
1002     
1003         loop_mech.prepare_watcher(callback);
1004         try {
1005             loop_mech.add_timer_nolock(callback->timer_handle, callback, clock);
1006         }
1007         catch (...) {
1008             loop_mech.release_watcher(callback);
1009         }
1010     }
1011     
1012     void set_timer(base_timer_watcher *callBack, const timespec &timeout, clock_type clock) noexcept
1013     {
1014         struct timespec interval {0, 0};
1015         loop_mech.set_timer(callBack->timer_handle, timeout, interval, true, clock);
1016     }
1017     
1018     void set_timer(base_timer_watcher *callBack, const timespec &timeout, const timespec &interval,
1019             clock_type clock) noexcept
1020     {
1021         loop_mech.set_timer(callBack->timer_handle, timeout, interval, true, clock);
1022     }
1023
1024     void set_timer_rel(base_timer_watcher *callBack, const timespec &timeout, clock_type clock) noexcept
1025     {
1026         struct timespec interval {0, 0};
1027         loop_mech.set_timer_rel(callBack->timer_handle, timeout, interval, true, clock);
1028     }
1029     
1030     void set_timer_rel(base_timer_watcher *callBack, const timespec &timeout,
1031             const timespec &interval, clock_type clock) noexcept
1032     {
1033         loop_mech.set_timer_rel(callBack->timer_handle, timeout, interval, true, clock);
1034     }
1035
1036     void set_timer_enabled(base_timer_watcher *callback, clock_type clock, bool enabled) noexcept
1037     {
1038         loop_mech.enable_timer(callback->timer_handle, enabled, clock);
1039     }
1040
1041     void set_timer_enabled_nolock(base_timer_watcher *callback, clock_type clock, bool enabled) noexcept
1042     {
1043         loop_mech.enable_timer_nolock(callback->timer_handle, enabled, clock);
1044     }
1045
1046     void stop_timer(base_timer_watcher *callback, clock_type clock) noexcept
1047     {
1048         loop_mech.stop_timer(callback->timer_handle, clock);
1049     }
1050
1051     void deregister(base_timer_watcher *callback, clock_type clock) noexcept
1052     {
1053         loop_mech.remove_timer(callback->timer_handle, clock);
1054         
1055         waitqueue_node<T_Mutex> qnode;
1056         get_attn_lock(qnode);
1057         
1058         loop_mech.issue_delete(callback);
1059         
1060         release_lock(qnode);
1061     }
1062     
1063     void dequeue_watcher(base_watcher *watcher) noexcept
1064     {
1065         loop_mech.dequeue_watcher(watcher);
1066     }
1067
1068     void requeue_watcher(base_watcher *watcher) noexcept
1069     {
1070         loop_mech.queue_watcher(watcher);
1071         interrupt_if_necessary();
1072     }
1073
1074     void release_watcher(base_watcher *watcher) noexcept
1075     {
1076         loop_mech.release_watcher(watcher);
1077     }
1078
1079     // Interrupt the current poll-waiter, if necessary - that is, if the loop is multi-thread safe, and if
1080     // there is currently another thread polling the backend event mechanism.
1081     void interrupt_if_necessary()
1082     {
1083         wait_lock.lock();
1084         bool attn_q_empty = attn_waitqueue.is_empty(); // (always false for single-threaded loops)
1085         wait_lock.unlock();
1086
1087         if (! attn_q_empty) {
1088             loop_mech.interrupt_wait();
1089         }
1090     }
1091
1092     // Acquire the attention lock (when held, ensures that no thread is polling the AEN
1093     // mechanism). This can be used to safely remove watches, since it is certain that
1094     // notification callbacks won't be run while the attention lock is held. Any in-progress
1095     // poll will be interrupted so that the lock should be acquired quickly.
1096     void get_attn_lock(waitqueue_node<T_Mutex> &qnode) noexcept
1097     {
1098         std::unique_lock<T_Mutex> ulock(wait_lock);
1099         attn_waitqueue.queue(&qnode);        
1100         if (! attn_waitqueue.check_head(qnode)) {
1101             if (long_poll_running) {
1102                 // We want to interrupt any in-progress poll so that the attn queue will progress
1103                 // but we don't want to do that unnecessarily. If we are 2nd in the queue then the
1104                 // head must be doing the poll; interrupt it. Otherwise, we assume the 2nd has
1105                 // already interrupted it.
1106                 if (attn_waitqueue.get_second() == &qnode) {
1107                     loop_mech.interrupt_wait();
1108                 }
1109             }
1110             while (! attn_waitqueue.check_head(qnode)) {
1111                 qnode.wait(ulock);
1112             }
1113         }
1114     }
1115     
1116     // Acquire the attention lock, but without interrupting any poll that's in progress
1117     // (prefer to fail in that case).
1118     bool poll_attn_lock(waitqueue_node<T_Mutex> &qnode) noexcept
1119     {
1120         std::unique_lock<T_Mutex> ulock(wait_lock);
1121         if (long_poll_running) {
1122             // There are poll-waiters, bail out
1123             return false;
1124         }
1125
1126         // Nobody's doing a long poll, wait until we're at the head of the attn queue and return
1127         // success:
1128         attn_waitqueue.queue(&qnode);
1129         while (! attn_waitqueue.check_head(qnode)) {
1130             qnode.wait(ulock);
1131         }
1132
1133         return true;
1134     }
1135
1136     // Acquire the poll-wait lock (to be held when polling the AEN mechanism; lower priority than
1137     // the attention lock). The poll-wait lock is used to prevent more than a single thread from
1138     // polling the event loop mechanism at a time; if this is not done, it is basically
1139     // impossible to safely deregister watches.
1140     void get_pollwait_lock(waitqueue_node<T_Mutex> &qnode) noexcept
1141     {
1142         std::unique_lock<T_Mutex> ulock(wait_lock);
1143         if (attn_waitqueue.is_empty()) {
1144             // Queue is completely empty:
1145             attn_waitqueue.queue(&qnode);
1146         }
1147         else {
1148             wait_waitqueue.queue(&qnode);
1149         }
1150         
1151         while (! attn_waitqueue.check_head(qnode)) {
1152             qnode.wait(ulock);
1153         }
1154
1155         long_poll_running = true;
1156     }
1157     
1158     // Release the poll-wait/attention lock.
1159     void release_lock(waitqueue_node<T_Mutex> &qnode) noexcept
1160     {
1161         std::unique_lock<T_Mutex> ulock(wait_lock);
1162         long_poll_running = false;
1163         waitqueue_node<T_Mutex> * nhead = attn_waitqueue.unqueue();
1164         if (nhead != nullptr) {
1165             // Someone else now owns the lock, signal them to wake them up
1166             nhead->signal();
1167         }
1168         else {
1169             // Nobody is waiting in attn_waitqueue (the high-priority queue) so check in
1170             // wait_waitqueue (the low-priority queue)
1171             if (! wait_waitqueue.is_empty()) {
1172                 auto nhead = wait_waitqueue.get_head();
1173                 wait_waitqueue.unqueue();
1174                 attn_waitqueue.queue(nhead);
1175                 long_poll_running = true;
1176                 nhead->signal();
1177             }
1178         }                
1179     }
1180     
1181     void process_signal_rearm(base_signal_watcher * bsw, rearm rearm_type) noexcept
1182     {
1183         // Called with lock held
1184         if (rearm_type == rearm::REARM) {
1185             loop_mech.rearm_signal_watch_nolock(bsw->siginfo.get_signo(), bsw);
1186             if (backend_traits_t::interrupt_after_signal_add) {
1187                 interrupt_if_necessary();
1188             }
1189         }
1190         else if (rearm_type == rearm::REMOVE) {
1191             loop_mech.remove_signal_watch_nolock(bsw->siginfo.get_signo());
1192         }
1193         // Note that signal watchers cannot (currently) be disarmed
1194     }
1195
1196     // Process rearm return from an fd_watcher, including the primary watcher of a bidi_fd_watcher.
1197     // Depending on the rearm value, we re-arm, remove, or disarm the watcher, etc.
1198     rearm process_fd_rearm(base_fd_watcher * bfw, rearm rearm_type) noexcept
1199     {
1200         bool emulatedfd = static_cast<base_watcher *>(bfw)->emulatefd;
1201
1202         if (emulatedfd) {
1203             if (rearm_type == rearm::REARM) {
1204                 bfw->emulate_enabled = true;
1205                 rearm_type = rearm::REQUEUE;
1206             }
1207             else if (rearm_type == rearm::DISARM) {
1208                 bfw->emulate_enabled = false;
1209             }
1210             else if (rearm_type == rearm::NOOP) {
1211                 if (bfw->emulate_enabled) {
1212                     rearm_type = rearm::REQUEUE;
1213                 }
1214             }
1215         }
1216         else  if (rearm_type == rearm::REARM) {
1217             set_fd_enabled_nolock(bfw, bfw->watch_fd,
1218                     bfw->watch_flags & (IN_EVENTS | OUT_EVENTS), true);
1219         }
1220         else if (rearm_type == rearm::DISARM) {
1221             loop_mech.disable_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
1222         }
1223         else if (rearm_type == rearm::REMOVE) {
1224             loop_mech.remove_fd_watch_nolock(bfw->watch_fd, bfw->watch_flags);
1225         }
1226         return rearm_type;
1227     }
1228
1229     // Process rearm option from the primary watcher in bidi_fd_watcher
1230     rearm process_primary_rearm(base_bidi_fd_watcher * bdfw, rearm rearm_type) noexcept
1231     {
1232         bool emulatedfd = static_cast<base_watcher *>(bdfw)->emulatefd;
1233
1234         // Called with lock held
1235         if (rearm_type == rearm::REMOVE) {
1236             bdfw->read_removed = 1;
1237
1238             if (backend_traits_t::has_separate_rw_fd_watches) {
1239                 bdfw->watch_flags &= ~IN_EVENTS;
1240                 if (! emulatedfd) {
1241                     loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
1242                 }
1243                 return bdfw->write_removed ? rearm::REMOVE : rearm::NOOP;
1244             }
1245             else {
1246                 if (! bdfw->write_removed) {
1247                     if (bdfw->watch_flags & IN_EVENTS) {
1248                         bdfw->watch_flags &= ~IN_EVENTS;
1249                         if (! emulatedfd) {
1250                             set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags,
1251                                     bdfw->watch_flags != 0);
1252                         }
1253                     }
1254                     return rearm::NOOP;
1255                 }
1256                 else {
1257                     // both removed: actually remove
1258                     if (! emulatedfd) {
1259                         loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, 0 /* not used */);
1260                     }
1261                     return rearm::REMOVE;
1262                 }
1263             }
1264         }
1265         else if (rearm_type == rearm::DISARM) {
1266             bdfw->watch_flags &= ~IN_EVENTS;
1267
1268             if (! emulatedfd) {
1269                 if (! backend_traits_t::has_separate_rw_fd_watches) {
1270                     int watch_flags = bdfw->watch_flags  & (IN_EVENTS | OUT_EVENTS);
1271                     set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags, watch_flags != 0);
1272                 }
1273                 else {
1274                     loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, IN_EVENTS);
1275                 }
1276             }
1277         }
1278         else if (rearm_type == rearm::REARM) {
1279             if (! emulatedfd) {
1280                 bdfw->watch_flags |= IN_EVENTS;
1281                 if (! backend_traits_t::has_separate_rw_fd_watches) {
1282                     int watch_flags = bdfw->watch_flags;
1283                     set_fd_enabled_nolock(bdfw, bdfw->watch_fd,
1284                             watch_flags & (IN_EVENTS | OUT_EVENTS), true);
1285                 }
1286                 else {
1287                     set_fd_enabled_nolock(bdfw, bdfw->watch_fd, IN_EVENTS, true);
1288                 }
1289             }
1290             else {
1291                 bdfw->watch_flags &= ~IN_EVENTS;
1292                 rearm_type = rearm::REQUEUE;
1293             }
1294         }
1295         else if (rearm_type == rearm::NOOP) {
1296             if (bdfw->emulatefd) {
1297                 if (bdfw->watch_flags & IN_EVENTS) {
1298                     bdfw->watch_flags &= ~IN_EVENTS;
1299                     rearm_type = rearm::REQUEUE;
1300                 }
1301             }
1302         }
1303         return rearm_type;
1304     }
1305
1306     // Process re-arm for the secondary (output) watcher in a Bi-direction Fd watcher.
1307     rearm process_secondary_rearm(base_bidi_fd_watcher * bdfw, base_watcher * outw, rearm rearm_type) noexcept
1308     {
1309         bool emulatedfd = outw->emulatefd;
1310
1311         // Called with lock held
1312         if (emulatedfd) {
1313             if (rearm_type == rearm::REMOVE) {
1314                 bdfw->write_removed = 1;
1315                 bdfw->watch_flags &= ~OUT_EVENTS;
1316                 rearm_type = bdfw->read_removed ? rearm::REMOVE : rearm::NOOP;
1317             }
1318             else if (rearm_type == rearm::DISARM) {
1319                 bdfw->watch_flags &= ~OUT_EVENTS;
1320             }
1321             else if (rearm_type == rearm::REARM) {
1322                 bdfw->watch_flags &= ~OUT_EVENTS;
1323                 rearm_type = rearm::REQUEUE;
1324             }
1325             else if (rearm_type == rearm::NOOP) {
1326                 if (bdfw->watch_flags & OUT_EVENTS) {
1327                     bdfw->watch_flags &= ~OUT_EVENTS;
1328                     rearm_type = rearm::REQUEUE;
1329                 }
1330             }
1331             return rearm_type;
1332         }
1333         else if (rearm_type == rearm::REMOVE) {
1334             bdfw->write_removed = 1;
1335
1336             if (backend_traits_t::has_separate_rw_fd_watches) {
1337                 bdfw->watch_flags &= ~OUT_EVENTS;
1338                 loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, OUT_EVENTS);
1339                 return bdfw->read_removed ? rearm::REMOVE : rearm::NOOP;
1340             }
1341             else {
1342                 if (! bdfw->read_removed) {
1343                     if (bdfw->watch_flags & OUT_EVENTS) {
1344                         bdfw->watch_flags &= ~OUT_EVENTS;
1345                         set_fd_enabled_nolock(bdfw, bdfw->watch_fd, bdfw->watch_flags, true);
1346                     }
1347                     return rearm::NOOP;
1348                 }
1349                 else {
1350                     // both removed: actually remove
1351                     loop_mech.remove_fd_watch_nolock(bdfw->watch_fd, 0 /* not used */);
1352                     return rearm::REMOVE;
1353                 }
1354             }
1355         }
1356         else if (rearm_type == rearm::DISARM) {
1357             bdfw->watch_flags &= ~OUT_EVENTS;
1358
1359             if (! backend_traits_t::has_separate_rw_fd_watches) {
1360                 int watch_flags = bdfw->watch_flags;
1361                 set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags & (IN_EVENTS | OUT_EVENTS), true);
1362             }
1363             else {
1364                 loop_mech.disable_fd_watch_nolock(bdfw->watch_fd, OUT_EVENTS);
1365             }
1366         }
1367         else if (rearm_type == rearm::REARM) {
1368             bdfw->watch_flags |= OUT_EVENTS;
1369             
1370             if (! backend_traits_t::has_separate_rw_fd_watches) {
1371                 int watch_flags = bdfw->watch_flags;
1372                 set_fd_enabled_nolock(bdfw, bdfw->watch_fd, watch_flags & (IN_EVENTS | OUT_EVENTS), true);
1373             }
1374             else {
1375                 set_fd_enabled_nolock(bdfw, bdfw->watch_fd, OUT_EVENTS | ONE_SHOT, true);
1376             }
1377         }
1378         return rearm_type;
1379     }
1380     
1381     void process_child_watch_rearm(base_child_watcher *bcw, rearm rearm_type) noexcept
1382     {
1383         if (rearm_type == rearm::REMOVE || rearm_type == rearm::DISARM) {
1384             loop_mech.unreserve_child_watch_nolock(bcw->watch_handle);
1385         }
1386     }
1387
1388     void process_timer_rearm(base_timer_watcher *btw, rearm rearm_type) noexcept
1389     {
1390         // Called with lock held
1391         if (rearm_type == rearm::REARM) {
1392             loop_mech.enable_timer_nolock(btw->timer_handle, true, btw->clock);
1393         }
1394         else if (rearm_type == rearm::REMOVE) {
1395             loop_mech.remove_timer_nolock(btw->timer_handle, btw->clock);
1396         }
1397         else if (rearm_type == rearm::DISARM) {
1398             loop_mech.enable_timer_nolock(btw->timer_handle, false, btw->clock);
1399         }
1400     }
1401
1402     // Process queued events; returns true if any events were processed.
1403     //   limit - maximum number of events to process before returning; -1 for
1404     //           no limit.
1405     bool process_events(int limit) noexcept
1406     {
1407         loop_mech.lock.lock();
1408         
1409         if (limit == 0) {
1410             return false;
1411         }
1412         
1413         base_watcher * pqueue = loop_mech.pull_event();
1414         bool active = false;
1415         
1416         while (pqueue != nullptr) {
1417         
1418             pqueue->active = true;
1419             active = true;
1420             
1421             base_bidi_fd_watcher *bbfw = nullptr;
1422             
1423             // (Above variables are initialised only to silence compiler warnings).
1424             
1425             if (pqueue->watchType == watch_type_t::SECONDARYFD) {
1426                 // construct a pointer to the main watcher, using integer arithmetic to avoid undefined
1427                 // pointer arithmetic:
1428                 uintptr_t rp = (uintptr_t)pqueue;
1429
1430                 // Here we take the offset of a member from a non-standard-layout class, which is
1431                 // specified to have undefined result by the C++ language standard, but which
1432                 // in practice works fine:
1433                 _Pragma ("GCC diagnostic push")
1434                 _Pragma ("GCC diagnostic ignored \"-Winvalid-offsetof\"")
1435                 rp -= offsetof(base_bidi_fd_watcher, out_watcher);
1436                 _Pragma ("GCC diagnostic pop")
1437                 bbfw = (base_bidi_fd_watcher *)rp;
1438
1439                 // issue a secondary dispatch:
1440                 bbfw->dispatch_second(this);
1441                 pqueue = loop_mech.pull_event();
1442                 continue;
1443             }
1444
1445             pqueue->dispatch(this);
1446             if (limit > 0) {
1447                 limit--;
1448                 if (limit == 0) break;
1449             }
1450             pqueue = loop_mech.pull_event();
1451         }
1452         
1453         loop_mech.lock.unlock();
1454         return active;
1455     }
1456
1457     public:
1458     
1459     using fd_watcher = dprivate::fd_watcher<my_event_loop_t>;
1460     using bidi_fd_watcher = dprivate::bidi_fd_watcher<my_event_loop_t>;
1461     using signal_watcher = dprivate::signal_watcher<my_event_loop_t>;
1462     using child_proc_watcher = dprivate::child_proc_watcher<my_event_loop_t>;
1463     using timer = dprivate::timer<my_event_loop_t>;
1464     
1465     template <typename D> using fd_watcher_impl = dprivate::fd_watcher_impl<my_event_loop_t, D>;
1466     template <typename D> using bidi_fd_watcher_impl = dprivate::bidi_fd_watcher_impl<my_event_loop_t, D>;
1467     template <typename D> using signal_watcher_impl = dprivate::signal_watcher_impl<my_event_loop_t, D>;
1468     template <typename D> using child_proc_watcher_impl = dprivate::child_proc_watcher_impl<my_event_loop_t, D>;
1469     template <typename D> using timer_impl = dprivate::timer_impl<my_event_loop_t, D>;
1470
1471     // Poll the event loop and process any pending events (up to a limit). If no events are pending, wait
1472     // for and process at least one event.
1473     void run(int limit = -1) noexcept
1474     {
1475         // Poll the mechanism first, in case high-priority events are pending:
1476         waitqueue_node<T_Mutex> qnode;
1477         get_pollwait_lock(qnode);
1478         loop_mech.pull_events(false);
1479         release_lock(qnode);
1480
1481         while (! process_events(limit)) {
1482             // Pull events from the AEN mechanism and insert them in our internal queue:
1483             get_pollwait_lock(qnode);
1484             loop_mech.pull_events(true);
1485             release_lock(qnode);
1486         }
1487     }
1488
1489     // Poll the event loop and process any pending events (up to a limit).
1490     void poll(int limit = -1) noexcept
1491     {
1492         waitqueue_node<T_Mutex> qnode;
1493         if (poll_attn_lock(qnode)) {
1494             loop_mech.pull_events(false);
1495             release_lock(qnode);
1496         }
1497
1498         process_events(limit);
1499     }
1500
1501     // Get the current time corresponding to a specific clock.
1502     //   ts - the timespec variable to receive the time
1503     //   clock - specifies the clock
1504     //   force_update (default = false) - if true, the time returned will be updated from
1505     //       the system rather than being a previously cached result. It may be more
1506     //       accurate, but note that reading from a system clock may be relatively expensive.
1507     void get_time(timespec &ts, clock_type clock, bool force_update = false) noexcept
1508     {
1509         loop_mech.get_time(ts, clock, force_update);
1510     }
1511
1512     void get_time(time_val &tv, clock_type clock, bool force_update = false) noexcept
1513     {
1514         loop_mech.get_time(tv, clock, force_update);
1515     }
1516
1517     event_loop() { }
1518     event_loop(const event_loop &other) = delete;
1519 };
1520
1521 typedef event_loop<null_mutex> event_loop_n;
1522 typedef event_loop<std::mutex> event_loop_th;
1523
1524 namespace dprivate {
1525
1526 // Posix signal event watcher
1527 template <typename EventLoop>
1528 class signal_watcher : private dprivate::base_signal_watcher<typename EventLoop::loop_traits_t::sigdata_t>
1529 {
1530     template <typename, typename> friend class signal_watcher_impl;
1531
1532     using base_watcher = dprivate::base_watcher;
1533     using T_Mutex = typename EventLoop::mutex_t;
1534     
1535     public:
1536     using event_loop_t = EventLoop;
1537     using siginfo_p = typename signal_watcher::siginfo_p;
1538
1539     // Register this watcher to watch the specified signal.
1540     // If an attempt is made to register with more than one event loop at
1541     // a time, behaviour is undefined. The signal should be masked before
1542     // call.
1543     inline void add_watch(event_loop_t &eloop, int signo, int prio = DEFAULT_PRIORITY)
1544     {
1545         base_watcher::init();
1546         this->priority = prio;
1547         this->siginfo.set_signo(signo);
1548         eloop.register_signal(this, signo);
1549     }
1550     
1551     inline void deregister(event_loop_t &eloop) noexcept
1552     {
1553         eloop.deregister(this, this->siginfo.get_signo());
1554     }
1555     
1556     template <typename T>
1557     static signal_watcher<event_loop_t> *add_watch(event_loop_t &eloop, int signo, T watch_hndlr)
1558     {
1559         class lambda_sig_watcher : public signal_watcher_impl<event_loop_t, lambda_sig_watcher>
1560         {
1561             private:
1562             T watch_hndlr;
1563
1564             public:
1565             lambda_sig_watcher(T watch_handlr_a) : watch_hndlr(watch_handlr_a)
1566             {
1567                 //
1568             }
1569
1570             rearm received(event_loop_t &eloop, int signo, siginfo_p siginfo)
1571             {
1572                 return watch_hndlr(eloop, signo, siginfo);
1573             }
1574
1575             void watch_removed() noexcept override
1576             {
1577                 delete this;
1578             }
1579         };
1580
1581         lambda_sig_watcher * lsw = new lambda_sig_watcher(watch_hndlr);
1582         lsw->add_watch(eloop, signo);
1583         return lsw;
1584     }
1585
1586     // virtual rearm received(EventLoop &eloop, int signo, siginfo_p siginfo) = 0;
1587 };
1588
1589 template <typename EventLoop, typename Derived>
1590 class signal_watcher_impl : public signal_watcher<EventLoop>
1591 {
1592     void dispatch(void *loop_ptr) noexcept override
1593     {
1594         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
1595         loop_access::get_base_lock(loop).unlock();
1596
1597         auto rearm_type = static_cast<Derived *>(this)->received(loop, this->siginfo.get_signo(), this->siginfo);
1598
1599         loop_access::get_base_lock(loop).lock();
1600
1601         if (rearm_type != rearm::REMOVED) {
1602
1603             this->active = false;
1604             if (this->deleteme) {
1605                 // We don't want a watch that is marked "deleteme" to re-arm itself.
1606                 rearm_type = rearm::REMOVE;
1607             }
1608
1609             loop_access::process_signal_rearm(loop, this, rearm_type);
1610
1611             post_dispatch(loop, this, rearm_type);
1612         }
1613     }
1614 };
1615
1616 // Posix file descriptor event watcher
1617 template <typename EventLoop>
1618 class fd_watcher : private dprivate::base_fd_watcher
1619 {
1620     template <typename, typename> friend class fd_watcher_impl;
1621
1622     using base_watcher = dprivate::base_watcher;
1623     using mutex_t = typename EventLoop::mutex_t;
1624
1625     protected:
1626     
1627     // Set the types of event to watch. Only supported if loop_traits_t_t::has_bidi_fd_watch
1628     // is true; otherwise has unspecified behavior.
1629     // Only safe to call from within the callback handler (fdEvent). Might not take
1630     // effect until the current callback handler returns with REARM.
1631     void set_watch_flags(int newFlags)
1632     {
1633         this->watch_flags = newFlags;
1634     }
1635     
1636     public:
1637     
1638     using event_loop_t = EventLoop;
1639
1640     // Register a file descriptor watcher with an event loop. Flags
1641     // can be any combination of dasynq::IN_EVENTS / dasynq::OUT_EVENTS.
1642     // Exactly one of IN_EVENTS/OUT_EVENTS must be specified if the event
1643     // loop does not support bi-directional fd watchers (i.e. if
1644     // ! loop_traits_t::has_bidi_fd_watch).
1645     //
1646     // Mechanisms supporting dual watchers allow for two watchers for a
1647     // single file descriptor (one watching read status and the other
1648     // write status). Others mechanisms support only a single watcher
1649     // per file descriptor. Adding a watcher beyond what is supported
1650     // causes undefined behavior.
1651     //
1652     // Can fail with std::bad_alloc or std::system_error.
1653     void add_watch(event_loop_t &eloop, int fd, int flags, bool enabled = true, int prio = DEFAULT_PRIORITY)
1654     {
1655         base_watcher::init();
1656         this->priority = prio;
1657         this->watch_fd = fd;
1658         this->watch_flags = flags;
1659         eloop.register_fd(this, fd, flags, enabled, true);
1660     }
1661
1662     void add_watch_noemu(event_loop_t &eloop, int fd, int flags, bool enabled = true, int prio = DEFAULT_PRIORITY)
1663     {
1664         base_watcher::init();
1665         this->priority = prio;
1666         this->watch_fd = fd;
1667         this->watch_flags = flags;
1668         eloop.register_fd(this, fd, flags, enabled, false);
1669     }
1670     
1671     int get_watched_fd()
1672     {
1673         return this->watch_fd;
1674     }
1675     
1676     // Deregister a file descriptor watcher.
1677     //
1678     // If other threads may be polling the event loop, it is not safe to assume
1679     // the watcher is unregistered until the watch_removed() callback is issued
1680     // (which will not occur until the event handler returns, if it is active).
1681     // In a single threaded environment, it is safe to delete the watcher after
1682     // calling this method as long as the handler (if it is active) accesses no
1683     // internal state and returns rearm::REMOVED.
1684     void deregister(event_loop_t &eloop) noexcept
1685     {
1686         eloop.deregister(this, this->watch_fd);
1687     }
1688     
1689     void set_enabled(event_loop_t &eloop, bool enable) noexcept
1690     {
1691         std::lock_guard<mutex_t> guard(eloop.get_base_lock());
1692         if (this->emulatefd) {
1693             if (enable && ! this->emulate_enabled) {
1694                 loop_access::requeue_watcher(eloop, this);
1695             }
1696             this->emulate_enabled = enable;
1697         }
1698         else {
1699             eloop.set_fd_enabled_nolock(this, this->watch_fd, this->watch_flags, enable);
1700         }
1701
1702         if (! enable) {
1703             eloop.dequeue_watcher(this);
1704         }
1705     }
1706     
1707     // Add an Fd watch via a lambda. The watch is allocated dynamically and destroys
1708     // itself when removed from the event loop.
1709     template <typename T>
1710     static fd_watcher<EventLoop> *add_watch(event_loop_t &eloop, int fd, int flags, T watchHndlr)
1711     {
1712         class lambda_fd_watcher : public fd_watcher_impl<event_loop_t, lambda_fd_watcher>
1713         {
1714             private:
1715             T watchHndlr;
1716
1717             public:
1718             lambda_fd_watcher(T watchHandlr_a) : watchHndlr(watchHandlr_a)
1719             {
1720                 //
1721             }
1722
1723             rearm fd_event(event_loop_t &eloop, int fd, int flags)
1724             {
1725                 return watchHndlr(eloop, fd, flags);
1726             }
1727
1728             void watch_removed() noexcept override
1729             {
1730                 delete this;
1731             }
1732         };
1733         
1734         lambda_fd_watcher * lfd = new lambda_fd_watcher(watchHndlr);
1735         lfd->add_watch(eloop, fd, flags);
1736         return lfd;
1737     }
1738     
1739     // virtual rearm fd_event(EventLoop &eloop, int fd, int flags) = 0;
1740 };
1741
1742 template <typename EventLoop, typename Derived>
1743 class fd_watcher_impl : public fd_watcher<EventLoop>
1744 {
1745     void dispatch(void *loop_ptr) noexcept override
1746     {
1747         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
1748
1749         // In case emulating, clear enabled here; REARM or explicit set_enabled will re-enable.
1750         this->emulate_enabled = false;
1751
1752         loop_access::get_base_lock(loop).unlock();
1753
1754         auto rearm_type = static_cast<Derived *>(this)->fd_event(loop, this->watch_fd, this->event_flags);
1755
1756         loop_access::get_base_lock(loop).lock();
1757
1758         if (rearm_type != rearm::REMOVED) {
1759             this->event_flags = 0;
1760             this->active = false;
1761             if (this->deleteme) {
1762                 // We don't want a watch that is marked "deleteme" to re-arm itself.
1763                 rearm_type = rearm::REMOVE;
1764             }
1765
1766             rearm_type = loop_access::process_fd_rearm(loop, this, rearm_type);
1767
1768             post_dispatch(loop, this, rearm_type);
1769         }
1770     }
1771 };
1772
1773
1774 // A Bi-directional file descriptor watcher with independent read- and write- channels.
1775 // This watcher type has two event notification methods which can both potentially be
1776 // active at the same time.
1777 template <typename EventLoop>
1778 class bidi_fd_watcher : private dprivate::base_bidi_fd_watcher
1779 {
1780     template <typename, typename> friend class bidi_fd_watcher_impl;
1781
1782     using base_watcher = dprivate::base_watcher;
1783     using mutex_t = typename EventLoop::mutex_t;
1784     
1785     void set_watch_enabled(EventLoop &eloop, bool in, bool b)
1786     {
1787         int events = in ? IN_EVENTS : OUT_EVENTS;
1788         auto orig_flags = this->watch_flags;
1789         
1790         if (b) {
1791             this->watch_flags |= events;
1792         }
1793         else {
1794             this->watch_flags &= ~events;
1795         }
1796
1797         dprivate::base_watcher * watcher = in ? this : &this->out_watcher;
1798
1799         if (! watcher->emulatefd) {
1800             if (EventLoop::loop_traits_t::has_separate_rw_fd_watches) {
1801                 eloop.set_fd_enabled_nolock(this, this->watch_fd, events | ONE_SHOT, b);
1802             }
1803             else {
1804                 eloop.set_fd_enabled_nolock(this, this->watch_fd,
1805                         (this->watch_flags & IO_EVENTS) | ONE_SHOT,
1806                         (this->watch_flags & IO_EVENTS) != 0);
1807             }
1808         }
1809         else {
1810             // emulation: if enabling a previously disabled watcher, must queue now:
1811             if (b && (orig_flags != this->watch_flags)) {
1812                 this->watch_flags = orig_flags;
1813                 loop_access::requeue_watcher(eloop, watcher);
1814             }
1815         }
1816
1817         if (! b) {
1818             eloop.dequeue_watcher(watcher);
1819         }
1820     }
1821     
1822     public:
1823
1824     using event_loop_t = EventLoop;
1825
1826     void set_in_watch_enabled(event_loop_t &eloop, bool b) noexcept
1827     {
1828         eloop.get_base_lock().lock();
1829         set_watch_enabled(eloop, true, b);
1830         eloop.get_base_lock().unlock();
1831     }
1832     
1833     void set_out_watch_enabled(event_loop_t &eloop, bool b) noexcept
1834     {
1835         eloop.get_base_lock().lock();
1836         set_watch_enabled(eloop, false, b);
1837         eloop.get_base_lock().unlock();
1838     }
1839     
1840     // Set the watch flags, which enables/disables both the in-watch and the out-watch accordingly.
1841     //
1842     // Concurrency: this method can only be called if
1843     //  - it does not enable a watcher that might currently be active
1844     ///   - unless the event loop will not be polled while the watcher is active.
1845     // (i.e. it is ok to call setWatchFlags from within the readReady/writeReady handlers if no other
1846     //  thread will poll the event loop; it is always ok to *dis*able a watcher that might be active,
1847     //  though the re-arm action returned by the callback may undo the effect).
1848     void set_watches(event_loop_t &eloop, int new_flags) noexcept
1849     {
1850         std::lock_guard<mutex_t> guard(eloop.get_base_lock());
1851         bool use_emulation = this->emulatefd || this->out_watcher.emulatefd;
1852         if (use_emulation || EventLoop::loop_traits_t::has_separate_rw_fd_watches) {
1853             set_watch_enabled(eloop, true, (new_flags & IN_EVENTS) != 0);
1854             set_watch_enabled(eloop, false, (new_flags & OUT_EVENTS) != 0);
1855         }
1856         else {
1857             this->watch_flags = (this->watch_flags & ~IO_EVENTS) | new_flags;
1858             eloop.set_fd_enabled_nolock((dprivate::base_watcher *) this, this->watch_fd, this->watch_flags & IO_EVENTS, true);
1859         }
1860     }
1861     
1862     // Register a bi-direction file descriptor watcher with an event loop. Flags
1863     // can be any combination of dasynq::IN_EVENTS / dasynq::OUT_EVENTS.
1864     //
1865     // Can fail with std::bad_alloc or std::system_error.
1866     void add_watch(event_loop_t &eloop, int fd, int flags, int inprio = DEFAULT_PRIORITY, int outprio = DEFAULT_PRIORITY)
1867     {
1868         base_watcher::init();
1869         this->out_watcher.base_watcher::init();
1870         this->watch_fd = fd;
1871         this->watch_flags = flags | dprivate::multi_watch;
1872         this->read_removed = false;
1873         this->write_removed = false;
1874         this->priority = inprio;
1875         this->set_priority(this->out_watcher, outprio);
1876         eloop.register_fd(this, fd, flags, true);
1877     }
1878
1879     void add_watch_noemu(event_loop_t &eloop, int fd, int flags, int inprio = DEFAULT_PRIORITY, int outprio = DEFAULT_PRIORITY)
1880     {
1881         base_watcher::init();
1882         this->out_watcher.base_watcher::init();
1883         this->watch_fd = fd;
1884         this->watch_flags = flags | dprivate::multi_watch;
1885         this->read_removed = false;
1886         this->write_removed = false;
1887         this->priority = inprio;
1888         this->set_priority(this->out_watcher, outprio);
1889         eloop.register_fd(this, fd, flags, false);
1890     }
1891
1892     int get_watched_fd()
1893     {
1894         return this->watch_fd;
1895     }
1896     
1897     // Deregister a bi-direction file descriptor watcher.
1898     //
1899     // If other threads may be polling the event loop, it is not safe to assume
1900     // the watcher is unregistered until the watch_removed() callback is issued
1901     // (which will not occur until the event handler returns, if it is active).
1902     // In a single threaded environment, it is safe to delete the watcher after
1903     // calling this method as long as the handler (if it is active) accesses no
1904     // internal state and returns rearm::REMOVED.
1905     void deregister(event_loop_t &eloop) noexcept
1906     {
1907         eloop.deregister(this, this->watch_fd);
1908     }
1909     
1910     template <typename T>
1911     static bidi_fd_watcher<event_loop_t> *add_watch(event_loop_t &eloop, int fd, int flags, T watch_hndlr)
1912     {
1913         class lambda_bidi_watcher : public bidi_fd_watcher_impl<event_loop_t, lambda_bidi_watcher>
1914         {
1915             private:
1916             T watch_hndlr;
1917
1918             public:
1919             lambda_bidi_watcher(T watch_handlr_a) : watch_hndlr(watch_handlr_a)
1920             {
1921                 //
1922             }
1923
1924             rearm read_ready(event_loop_t &eloop, int fd)
1925             {
1926                 return watch_hndlr(eloop, fd, IN_EVENTS);
1927             }
1928
1929             rearm write_ready(event_loop_t &eloop, int fd)
1930             {
1931                 return watch_hndlr(eloop, fd, OUT_EVENTS);
1932             }
1933
1934             void watch_removed() noexcept override
1935             {
1936                 delete this;
1937             }
1938         };
1939
1940         lambda_bidi_watcher * lfd = new lambda_bidi_watcher(watch_hndlr);
1941         lfd->add_watch(eloop, fd, flags);
1942         return lfd;
1943     }
1944
1945     // virtual rearm read_ready(EventLoop &eloop, int fd) noexcept = 0;
1946     // virtual rearm write_ready(EventLoop &eloop, int fd) noexcept = 0;
1947 };
1948
1949 template <typename EventLoop, typename Derived>
1950 class bidi_fd_watcher_impl : public bidi_fd_watcher<EventLoop>
1951 {
1952     void dispatch(void *loop_ptr) noexcept override
1953     {
1954         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
1955         this->emulate_enabled = false;
1956         loop_access::get_base_lock(loop).unlock();
1957
1958         auto rearm_type = static_cast<Derived *>(this)->read_ready(loop, this->watch_fd);
1959
1960         loop_access::get_base_lock(loop).lock();
1961
1962         if (rearm_type != rearm::REMOVED) {
1963             this->event_flags &= ~IN_EVENTS;
1964             this->active = false;
1965             if (this->deleteme) {
1966                 // We don't want a watch that is marked "deleteme" to re-arm itself.
1967                 rearm_type = rearm::REMOVE;
1968             }
1969
1970             rearm_type = loop_access::process_primary_rearm(loop, this, rearm_type);
1971
1972             auto &outwatcher = bidi_fd_watcher<EventLoop>::out_watcher;
1973             post_dispatch(loop, this, &outwatcher, rearm_type);
1974         }
1975     }
1976
1977     void dispatch_second(void *loop_ptr) noexcept override
1978     {
1979         auto &outwatcher = bidi_fd_watcher<EventLoop>::out_watcher;
1980
1981         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
1982         loop_access::get_base_lock(loop).unlock();
1983
1984         auto rearm_type = static_cast<Derived *>(this)->write_ready(loop, this->watch_fd);
1985
1986         loop_access::get_base_lock(loop).lock();
1987
1988         if (rearm_type != rearm::REMOVED) {
1989             this->event_flags &= ~OUT_EVENTS;
1990             outwatcher.active = false;
1991             if (outwatcher.deleteme) {
1992                 // We don't want a watch that is marked "deleteme" to re-arm itself.
1993                 rearm_type = rearm::REMOVE;
1994             }
1995
1996             rearm_type = loop_access::process_secondary_rearm(loop, this, &outwatcher, rearm_type);
1997
1998             if (rearm_type == rearm::REQUEUE) {
1999                 post_dispatch(loop, &outwatcher, rearm_type);
2000             }
2001             else {
2002                 post_dispatch(loop, this, &outwatcher, rearm_type);
2003             }
2004         }
2005     }
2006 };
2007
2008 // Child process event watcher
2009 template <typename EventLoop>
2010 class child_proc_watcher : private dprivate::base_child_watcher
2011 {
2012     template <typename, typename> friend class child_proc_watcher_impl;
2013
2014     using base_watcher = dprivate::base_watcher;
2015     using mutex_t = typename EventLoop::mutex_t;
2016
2017     public:
2018
2019     using event_loop_t = EventLoop;
2020
2021     // send a signal to this process, if it is still running, in a race-free manner.
2022     // return is as for POSIX kill(); return is -1 with errno=ESRCH if process has
2023     // already terminated.
2024     int send_signal(event_loop_t &loop, int signo) noexcept
2025     {
2026         auto reaper_mutex = loop.get_reaper_mutex();
2027         std::lock_guard<decltype(reaper_mutex)> guard(reaper_mutex);
2028
2029         if (this->child_termd) {
2030             errno = ESRCH;
2031             return -1;
2032         }
2033
2034         return kill(this->watch_pid, signo);
2035     }
2036
2037     // Reserve resources for a child watcher with the given event loop.
2038     // Reservation can fail with std::bad_alloc. Some backends do not support
2039     // reservation (it will always fail) - check loop_traits_t::supports_childwatch_reservation.
2040     void reserve_watch(event_loop_t &eloop)
2041     {
2042         eloop.reserve_child_watch(this);
2043     }
2044     
2045     void unreserve(event_loop_t &eloop)
2046     {
2047         eloop.unreserve(this);
2048     }
2049     
2050     // Register a watcher for the given child process with an event loop.
2051     // Registration can fail with std::bad_alloc.
2052     // Note that in multi-threaded programs, use of this function may be prone to a
2053     // race condition such that the child terminates before the watcher is registered.
2054     void add_watch(event_loop_t &eloop, pid_t child, int prio = DEFAULT_PRIORITY)
2055     {
2056         base_watcher::init();
2057         this->watch_pid = child;
2058         this->priority = prio;
2059         eloop.register_child(this, child);
2060     }
2061     
2062     // Register a watcher for the given child process with an event loop,
2063     // after having reserved resources previously (using reserveWith).
2064     // Registration cannot fail.
2065     // Note that in multi-threaded programs, use of this function may be prone to a
2066     // race condition such that the child terminates before the watcher is registered;
2067     // use the "fork" member function to avoid this.
2068     void add_reserved(event_loop_t &eloop, pid_t child, int prio = DEFAULT_PRIORITY) noexcept
2069     {
2070         base_watcher::init();
2071         this->watch_pid = child;
2072         this->priority = prio;
2073         eloop.register_reserved_child(this, child);
2074     }
2075     
2076     void deregister(event_loop_t &eloop, pid_t child) noexcept
2077     {
2078         eloop.deregister(this, child);
2079     }
2080     
2081     // Stop watching the currently watched child, but retain watch reservation.
2082     void stop_watch(event_loop_t &eloop) noexcept
2083     {
2084         eloop.stop_watch(this);
2085     }
2086
2087     // Fork and watch the child with this watcher on the given event loop.
2088     // If resource limitations prevent the child process from being watched, it is
2089     // terminated immediately (or if the implementation allows, never started),
2090     // and a suitable std::system_error or std::bad_alloc exception is thrown.
2091     // Returns:
2092     // - the child pid in the parent
2093     // - 0 in the child
2094     pid_t fork(event_loop_t &eloop, bool from_reserved = false, int prio = DEFAULT_PRIORITY)
2095     {
2096         base_watcher::init();
2097         this->priority = prio;
2098
2099         if (EventLoop::loop_traits_t::supports_childwatch_reservation) {
2100             // Reserve a watch, fork, then claim reservation
2101             if (! from_reserved) {
2102                 reserve_watch(eloop);
2103             }
2104             
2105             auto &lock = eloop.get_base_lock();
2106             lock.lock();
2107             
2108             pid_t child = ::fork();
2109             if (child == -1) {
2110                 // Unreserve watch.
2111                 lock.unlock();
2112                 unreserve(eloop);
2113                 throw std::system_error(errno, std::system_category());
2114             }
2115             
2116             if (child == 0) {
2117                 // I am the child
2118                 lock.unlock(); // may not really be necessary
2119                 return 0;
2120             }
2121             
2122             // Register this watcher.
2123             this->watch_pid = child;
2124             eloop.register_reserved_child_nolock(this, child);
2125             lock.unlock();
2126             return child;
2127         }
2128         else {
2129             int pipefds[2];
2130             if (pipe2(pipefds, O_CLOEXEC) == -1) {
2131                 throw std::system_error(errno, std::system_category());
2132             }
2133             
2134             std::lock_guard<mutex_t> guard(eloop.get_base_lock());
2135             
2136             pid_t child = ::fork();
2137             if (child == -1) {
2138                 throw std::system_error(errno, std::system_category());
2139             }
2140             
2141             if (child == 0) {
2142                 // I am the child
2143                 close(pipefds[1]);
2144                 
2145                 // Wait for message from parent before continuing:
2146                 int rr;
2147                 int r = read(pipefds[0], &rr, sizeof(rr));
2148                 while (r == -1 && errno == EINTR) {
2149                     r = read(pipefds[0], &rr, sizeof(rr));
2150                 }
2151                 
2152                 if (r <= 0) _exit(0);
2153                 
2154                 close(pipefds[0]);
2155                 return 0;
2156             }
2157             
2158             close(pipefds[0]); // close read end
2159             
2160             // Register this watcher.
2161             try {
2162                 this->watch_pid = child;
2163                 eloop.register_child(this, child);
2164                 
2165                 // Continue in child (it doesn't matter what is written):
2166                 write(pipefds[1], &pipefds, sizeof(int));
2167                 close(pipefds[1]);
2168                 
2169                 return child;
2170             }
2171             catch (...) {
2172                 close(pipefds[1]);
2173                 throw;
2174             }
2175         }
2176     }
2177     
2178     // virtual rearm child_status(EventLoop &eloop, pid_t child, int status) = 0;
2179 };
2180
2181 template <typename EventLoop, typename Derived>
2182 class child_proc_watcher_impl : public child_proc_watcher<EventLoop>
2183 {
2184     void dispatch(void *loop_ptr) noexcept override
2185     {
2186         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
2187         loop_access::get_base_lock(loop).unlock();
2188
2189         auto rearm_type = static_cast<Derived *>(this)->status_change(loop, this->watch_pid, this->child_status);
2190
2191         loop_access::get_base_lock(loop).lock();
2192
2193         if (rearm_type != rearm::REMOVED) {
2194
2195             this->active = false;
2196             if (this->deleteme) {
2197                 // We don't want a watch that is marked "deleteme" to re-arm itself.
2198                 rearm_type = rearm::REMOVE;
2199             }
2200
2201             loop_access::process_child_watch_rearm(loop, this, rearm_type);
2202
2203             // rearm_type = loop.process??;
2204             post_dispatch(loop, this, rearm_type);
2205         }
2206     }
2207 };
2208
2209 template <typename EventLoop>
2210 class timer : private base_timer_watcher
2211 {
2212     template <typename, typename> friend class timer_impl;
2213     using base_t = base_timer_watcher;
2214     using mutex_t = typename EventLoop::mutex_t;
2215
2216     public:
2217     using event_loop_t = EventLoop;
2218     
2219     void add_timer(event_loop_t &eloop, clock_type clock = clock_type::MONOTONIC, int prio = DEFAULT_PRIORITY)
2220     {
2221         base_watcher::init();
2222         this->priority = prio;
2223         this->clock = clock;
2224         this->intervals = 0;
2225         eloop.register_timer(this, clock);
2226     }
2227     
2228     void arm_timer(event_loop_t &eloop, const timespec &timeout) noexcept
2229     {
2230         eloop.set_timer(this, timeout, base_t::clock);
2231     }
2232     
2233     void arm_timer(event_loop_t &eloop, const timespec &timeout, const timespec &interval) noexcept
2234     {
2235         eloop.set_timer(this, timeout, interval, base_t::clock);
2236     }
2237
2238     // Arm timer, relative to now:
2239     void arm_timer_rel(event_loop_t &eloop, const timespec &timeout) noexcept
2240     {
2241         eloop.set_timer_rel(this, timeout, base_t::clock);
2242     }
2243     
2244     void arm_timer_rel(event_loop_t &eloop, const timespec &timeout,
2245             const timespec &interval) noexcept
2246     {
2247         eloop.set_timer_rel(this, timeout, interval, base_t::clock);
2248     }
2249     
2250     void stop_timer(event_loop_t &eloop) noexcept
2251     {
2252         eloop.stop_timer(this, base_t::clock);
2253     }
2254
2255     void set_enabled(event_loop_t &eloop, clock_type clock, bool enabled) noexcept
2256     {
2257         std::lock_guard<mutex_t> guard(eloop.get_base_lock());
2258         eloop.set_timer_enabled_nolock(this, clock, enabled);
2259         if (! enabled) {
2260             eloop.dequeue_watcher(this);
2261         }
2262     }
2263
2264     void deregister(event_loop_t &eloop) noexcept
2265     {
2266         eloop.deregister(this, this->clock);
2267     }
2268
2269     template <typename T>
2270     static timer<EventLoop> *add_timer(EventLoop &eloop, clock_type clock, bool relative,
2271             const timespec &timeout, const timespec &interval, T watch_hndlr)
2272     {
2273         class lambda_timer : public timer_impl<event_loop_t, lambda_timer>
2274         {
2275             private:
2276             T watch_hndlr;
2277
2278             public:
2279             lambda_timer(T watch_handlr_a) : watch_hndlr(watch_handlr_a)
2280             {
2281                 //
2282             }
2283
2284             rearm timer_expiry(event_loop_t &eloop, int intervals)
2285             {
2286                 return watch_hndlr(eloop, intervals);
2287             }
2288
2289             void watch_removed() noexcept override
2290             {
2291                 delete this;
2292             }
2293         };
2294
2295         lambda_timer * lt = new lambda_timer(watch_hndlr);
2296         lt->add_timer(eloop, clock);
2297         if (relative) {
2298             lt->arm_timer_rel(eloop, timeout, interval);
2299         }
2300         else {
2301             lt->arm_timer(eloop, timeout, interval);
2302         }
2303         return lt;
2304     }
2305
2306     // Timer expired, and the given number of intervals have elapsed before
2307     // expiry event was queued. Normally intervals == 1 to indicate no
2308     // overrun.
2309     // virtual rearm timer_expiry(event_loop_t &eloop, int intervals) = 0;
2310 };
2311
2312 template <typename EventLoop, typename Derived>
2313 class timer_impl : public timer<EventLoop>
2314 {
2315     void dispatch(void *loop_ptr) noexcept override
2316     {
2317         EventLoop &loop = *static_cast<EventLoop *>(loop_ptr);
2318         loop_access::get_base_lock(loop).unlock();
2319
2320         auto intervals_report = this->intervals;
2321         this->intervals = 0;
2322         auto rearm_type = static_cast<Derived *>(this)->timer_expiry(loop, intervals_report);
2323
2324         loop_access::get_base_lock(loop).lock();
2325
2326         if (rearm_type != rearm::REMOVED) {
2327
2328             this->active = false;
2329             if (this->deleteme) {
2330                 // We don't want a watch that is marked "deleteme" to re-arm itself.
2331                 rearm_type = rearm::REMOVE;
2332             }
2333
2334             loop_access::process_timer_rearm(loop, this, rearm_type);
2335
2336             post_dispatch(loop, this, rearm_type);
2337         }
2338     }
2339 };
2340
2341 }  // namespace dasynq::dprivate
2342 }  // namespace dasynq
2343
2344 #endif