From: Davin McCall Date: Sun, 5 Jun 2016 02:29:12 +0000 (+0100) Subject: Beginnings of kqueue support (far from complete) X-Git-Tag: v0.01~15 X-Git-Url: https://git.librecmc.org/?a=commitdiff_plain;h=25a1fa421fccaaacf968c4e345f464c258dd4437;p=oweals%2Fdinit.git Beginnings of kqueue support (far from complete) --- diff --git a/src/dasync/dasync-aen.h b/src/dasync/dasync-aen.h deleted file mode 100644 index 8dae802..0000000 --- a/src/dasync/dasync-aen.h +++ /dev/null @@ -1,440 +0,0 @@ -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include -#include - -namespace dasync { - -// Event type bits -constexpr unsigned int in_events = 1; -constexpr unsigned int out_events = 2; -constexpr unsigned int err_events = 4; - -constexpr unsigned int one_shot = 8; - - -template class EpollLoop; - -class EpollTraits -{ - template friend class EpollLoop; - - public: - - class SigInfo - { - template friend class EpollLoop; - - struct signalfd_siginfo info; - - public: - int get_signo() { return info.ssi_signo; } - int get_sicode() { return info.ssi_code; } - int get_siint() { return info.ssi_int; } - int get_ssiptr() { return info.ssi_ptr; } - int get_ssiaddr() { return info.ssi_addr; } - - void set_signo(int signo) { info.ssi_signo = signo; } - }; - - class FD_r; - - // File descriptor optional storage. If the mechanism can return the file descriptor, this - // class will be empty, otherwise it can hold a file descriptor. - class FD_s { - friend class FD_r; - - // Epoll doesn't return the file descriptor (it can, but it can't return both file - // descriptor and user data). - int fd; - }; - - // File descriptor reference (passed to event callback). If the mechanism can return the - // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor - // must be stored in an FD_s instance. - class FD_r { - public: - int getFd(FD_s ss) - { - return ss.fd; - } - }; -}; - - -template class EpollLoop : public Base -{ - int epfd; // epoll fd - int sigfd; // signalfd fd; -1 if not initialised - sigset_t sigmask; - - std::unordered_map sigdataMap; - - // Base contains: - // lock - a lock that can be used to protect internal structure. - // receive*() methods will be called with lock held. - // receiveSignal(SigInfo &, user *) noexcept - // receiveFdEvent(FD_r, user *, int flags) noexcept - - using SigInfo = EpollTraits::SigInfo; - using FD_r = typename EpollTraits::FD_r; - - void processEvents(epoll_event *events, int r) - { - std::lock_guard guard(Base::lock); - - for (int i = 0; i < r; i++) { - void * ptr = events[i].data.ptr; - - if (ptr == &sigfd) { - // Signal - SigInfo siginfo; - while (true) { - int r = read(sigfd, &siginfo.info, sizeof(siginfo.info)); - if (r == -1) break; - if (siginfo.get_signo() != SIGCHLD) { - // TODO remove the special exception for SIGCHLD? - sigdelset(&sigmask, siginfo.get_signo()); - } - auto iter = sigdataMap.find(siginfo.get_signo()); - if (iter != sigdataMap.end()) { - void *userdata = (*iter).second; - Base::receiveSignal(siginfo, userdata); - } - } - signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); - } - else { - int flags = 0; - (events[i].events & EPOLLIN) && (flags |= in_events); - (events[i].events & EPOLLHUP) && (flags |= in_events); - (events[i].events & EPOLLOUT) && (flags |= out_events); - (events[i].events & EPOLLERR) && (flags |= err_events); - Base::receiveFdEvent(*this, FD_r(), ptr, flags); - } - } - } - - public: - - /** - * EpollLoop constructor. - * - * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. - */ - EpollLoop() : sigfd(-1) - { - epfd = epoll_create1(EPOLL_CLOEXEC); - if (epfd == -1) { - throw std::system_error(errno, std::system_category()); - } - sigemptyset(&sigmask); - } - - ~EpollLoop() - { - close(epfd); - if (sigfd != -1) { - close(sigfd); - } - } - - // flags: in_events | out_events - void addFdWatch(int fd, void *userdata, int flags) - { - struct epoll_event epevent; - // epevent.data.fd = fd; - epevent.data.ptr = userdata; - epevent.events = 0; - - if (flags & one_shot) { - epevent.events = EPOLLONESHOT; - } - if (flags & in_events) { - epevent.events |= EPOLLIN; - } - if (flags & out_events) { - epevent.events |= EPOLLOUT; - } - - if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) { - throw new std::system_error(errno, std::system_category()); - } - } - - void removeFdWatch(int fd) - { - epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr); - } - - void removeFdWatch_nolock(int fd) - { - removeFdWatch(fd); - } - - // Note this will *replace* the old flags with the new, that is, - // it can enable *or disable* read/write events. - void enableFdWatch(int fd, void *userdata, int flags) - { - struct epoll_event epevent; - // epevent.data.fd = fd; - epevent.data.ptr = userdata; - epevent.events = 0; - - if (flags & one_shot) { - epevent.events = EPOLLONESHOT; - } - if (flags & in_events) { - epevent.events |= EPOLLIN; - } - if (flags & out_events) { - epevent.events |= EPOLLOUT; - } - - if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { - throw new std::system_error(errno, std::system_category()); - } - } - - void enableFdWatch_nolock(int fd, void *userdata, int flags) - { - enableFdWatch(fd, userdata, flags); - } - - void disableFdWatch(int fd) - { - struct epoll_event epevent; - // epevent.data.fd = fd; - epevent.data.ptr = nullptr; - epevent.events = 0; - - // Epoll documentation says that hangup will still be reported, need to check - // whether this is really the case. Suspect it is really only the case if - // EPOLLIN is set. - if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { - throw new std::system_error(errno, std::system_category()); - } - } - - // Note signal should be masked before call. - void addSignalWatch(int signo, void *userdata) - { - std::lock_guard guard(Base::lock); - - sigdataMap[signo] = userdata; - - // Modify the signal fd to watch the new signal - bool was_no_sigfd = (sigfd == -1); - sigaddset(&sigmask, signo); - sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); - if (sigfd == -1) { - throw new std::system_error(errno, std::system_category()); - } - - if (was_no_sigfd) { - // Add the signalfd to the epoll set. - struct epoll_event epevent; - epevent.data.ptr = &sigfd; - epevent.events = EPOLLIN; - // No need for EPOLLONESHOT - we can pull the signals out - // as we see them. - if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) { - close(sigfd); - throw new std::system_error(errno, std::system_category()); - } - } - } - - // Note, called with lock held: - void rearmSignalWatch_nolock(int signo) noexcept - { - sigaddset(&sigmask, signo); - signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); - } - - void removeSignalWatch_nolock(int signo) noexcept - { - sigdelset(&sigmask, signo); - signalfd(sigfd, &sigmask, 0); - } - - void removeSignalWatch(int signo) noexcept - { - std::lock_guard guard(Base::lock); - removeSignalWatch_nolock(signo); - } - - // If events are pending, process an unspecified number of them. - // If no events are pending, wait until one event is received and - // process this event (and possibly any other events received - // simultaneously). - // If processing an event removes a watch, there is a possibility - // that the watched event will still be reported (if it has - // occurred) before pullEvents() returns. - // - // do_wait - if false, returns immediately if no events are - // pending. - void pullEvents(bool do_wait) - { - epoll_event events[16]; - int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0); - if (r == -1 || r == 0) { - // signal or no events - return; - } - - processEvents(events, r); - } - - // If events are pending, process one of them. - // If no events are pending, wait until one event is received and - // process this event. - // - // do_wait - if false, returns immediately if no events are - // pending. - void pullOneEvent(bool do_wait) - { - epoll_event events[1]; - int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0); - if (r == -1 || r == 0) { - // signal or no events - return; - } - - processEvents(events, r); - } - - // Interrupt any current poll operation (pullEvents/pullOneEvent), causing - // it to to return immediately. - void interruptWait() - { - // TODO - } -}; - -// Map of pid_t to void *, with possibility of reserving entries so that mappings can -// be later added with no danger of allocator exhaustion (bad_alloc). -class pid_map -{ - using pair = std::pair; - std::unordered_map base_map; - std::vector backup_vector; - - // Number of entries in backup_vector that are actually in use (as opposed - // to simply reserved): - int backup_size = 0; - - public: - using entry = std::pair; - - entry get(pid_t key) noexcept - { - auto it = base_map.find(key); - if (it == base_map.end()) { - // Not in map; look in vector - for (int i = 0; i < backup_size; i++) { - if (backup_vector[i].first == key) { - return entry(true, backup_vector[i].second); - } - } - - return entry(false, nullptr); - } - - return entry(true, it->second); - } - - entry erase(pid_t key) noexcept - { - auto iter = base_map.find(key); - if (iter != base_map.end()) { - entry r(true, iter->second); - base_map.erase(iter); - return r; - } - for (int i = 0; i < backup_size; i++) { - if (backup_vector[i].first == key) { - entry r(true, backup_vector[i].second); - backup_vector.erase(backup_vector.begin() + i); - return r; - } - } - return entry(false, nullptr); - } - - // Throws bad_alloc on reservation failure - void reserve() - { - backup_vector.resize(backup_vector.size() + 1); - } - - void add(pid_t key, void *val) // throws std::bad_alloc - { - base_map[key] = val; - } - - void add_from_reserve(pid_t key, void *val) noexcept - { - try { - base_map[key] = val; - backup_vector.resize(backup_vector.size() - 1); - } - catch (std::bad_alloc &) { - // We couldn't add into the map, use the reserve: - backup_vector[backup_size++] = pair(key, val); - } - } -}; - -template class ChildProcEvents : public Base -{ - private: - pid_map child_waiters; - - using SigInfo = typename Base::SigInfo; - - protected: - void receiveSignal(SigInfo &siginfo, void *userdata) - { - if (siginfo.get_signo() == SIGCHLD) { - int status; - pid_t child; - while ((child = waitpid(-1, &status, WNOHANG)) > 0) { - pid_map::entry ent = child_waiters.erase(child); - if (ent.first) { - Base::receiveChildStat(child, status, ent.second); - } - } - } - else { - Base::receiveSignal(siginfo, userdata); - } - } - - public: - void reserveChildWatch() - { - child_waiters.reserve(); - } - - void addChildWatch(pid_t child, void *val) - { - child_waiters.add(child, val); - } - - void addReservedChildWatch(pid_t child, void *val) noexcept - { - child_waiters.add_from_reserve(child, val); - } -}; - -} // end namespace diff --git a/src/dasync/dasync-epoll.h b/src/dasync/dasync-epoll.h new file mode 100644 index 0000000..8dae802 --- /dev/null +++ b/src/dasync/dasync-epoll.h @@ -0,0 +1,440 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace dasync { + +// Event type bits +constexpr unsigned int in_events = 1; +constexpr unsigned int out_events = 2; +constexpr unsigned int err_events = 4; + +constexpr unsigned int one_shot = 8; + + +template class EpollLoop; + +class EpollTraits +{ + template friend class EpollLoop; + + public: + + class SigInfo + { + template friend class EpollLoop; + + struct signalfd_siginfo info; + + public: + int get_signo() { return info.ssi_signo; } + int get_sicode() { return info.ssi_code; } + int get_siint() { return info.ssi_int; } + int get_ssiptr() { return info.ssi_ptr; } + int get_ssiaddr() { return info.ssi_addr; } + + void set_signo(int signo) { info.ssi_signo = signo; } + }; + + class FD_r; + + // File descriptor optional storage. If the mechanism can return the file descriptor, this + // class will be empty, otherwise it can hold a file descriptor. + class FD_s { + friend class FD_r; + + // Epoll doesn't return the file descriptor (it can, but it can't return both file + // descriptor and user data). + int fd; + }; + + // File descriptor reference (passed to event callback). If the mechanism can return the + // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor + // must be stored in an FD_s instance. + class FD_r { + public: + int getFd(FD_s ss) + { + return ss.fd; + } + }; +}; + + +template class EpollLoop : public Base +{ + int epfd; // epoll fd + int sigfd; // signalfd fd; -1 if not initialised + sigset_t sigmask; + + std::unordered_map sigdataMap; + + // Base contains: + // lock - a lock that can be used to protect internal structure. + // receive*() methods will be called with lock held. + // receiveSignal(SigInfo &, user *) noexcept + // receiveFdEvent(FD_r, user *, int flags) noexcept + + using SigInfo = EpollTraits::SigInfo; + using FD_r = typename EpollTraits::FD_r; + + void processEvents(epoll_event *events, int r) + { + std::lock_guard guard(Base::lock); + + for (int i = 0; i < r; i++) { + void * ptr = events[i].data.ptr; + + if (ptr == &sigfd) { + // Signal + SigInfo siginfo; + while (true) { + int r = read(sigfd, &siginfo.info, sizeof(siginfo.info)); + if (r == -1) break; + if (siginfo.get_signo() != SIGCHLD) { + // TODO remove the special exception for SIGCHLD? + sigdelset(&sigmask, siginfo.get_signo()); + } + auto iter = sigdataMap.find(siginfo.get_signo()); + if (iter != sigdataMap.end()) { + void *userdata = (*iter).second; + Base::receiveSignal(siginfo, userdata); + } + } + signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + } + else { + int flags = 0; + (events[i].events & EPOLLIN) && (flags |= in_events); + (events[i].events & EPOLLHUP) && (flags |= in_events); + (events[i].events & EPOLLOUT) && (flags |= out_events); + (events[i].events & EPOLLERR) && (flags |= err_events); + Base::receiveFdEvent(*this, FD_r(), ptr, flags); + } + } + } + + public: + + /** + * EpollLoop constructor. + * + * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. + */ + EpollLoop() : sigfd(-1) + { + epfd = epoll_create1(EPOLL_CLOEXEC); + if (epfd == -1) { + throw std::system_error(errno, std::system_category()); + } + sigemptyset(&sigmask); + } + + ~EpollLoop() + { + close(epfd); + if (sigfd != -1) { + close(sigfd); + } + } + + // flags: in_events | out_events + void addFdWatch(int fd, void *userdata, int flags) + { + struct epoll_event epevent; + // epevent.data.fd = fd; + epevent.data.ptr = userdata; + epevent.events = 0; + + if (flags & one_shot) { + epevent.events = EPOLLONESHOT; + } + if (flags & in_events) { + epevent.events |= EPOLLIN; + } + if (flags & out_events) { + epevent.events |= EPOLLOUT; + } + + if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } + + void removeFdWatch(int fd) + { + epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr); + } + + void removeFdWatch_nolock(int fd) + { + removeFdWatch(fd); + } + + // Note this will *replace* the old flags with the new, that is, + // it can enable *or disable* read/write events. + void enableFdWatch(int fd, void *userdata, int flags) + { + struct epoll_event epevent; + // epevent.data.fd = fd; + epevent.data.ptr = userdata; + epevent.events = 0; + + if (flags & one_shot) { + epevent.events = EPOLLONESHOT; + } + if (flags & in_events) { + epevent.events |= EPOLLIN; + } + if (flags & out_events) { + epevent.events |= EPOLLOUT; + } + + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } + + void enableFdWatch_nolock(int fd, void *userdata, int flags) + { + enableFdWatch(fd, userdata, flags); + } + + void disableFdWatch(int fd) + { + struct epoll_event epevent; + // epevent.data.fd = fd; + epevent.data.ptr = nullptr; + epevent.events = 0; + + // Epoll documentation says that hangup will still be reported, need to check + // whether this is really the case. Suspect it is really only the case if + // EPOLLIN is set. + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { + throw new std::system_error(errno, std::system_category()); + } + } + + // Note signal should be masked before call. + void addSignalWatch(int signo, void *userdata) + { + std::lock_guard guard(Base::lock); + + sigdataMap[signo] = userdata; + + // Modify the signal fd to watch the new signal + bool was_no_sigfd = (sigfd == -1); + sigaddset(&sigmask, signo); + sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + if (sigfd == -1) { + throw new std::system_error(errno, std::system_category()); + } + + if (was_no_sigfd) { + // Add the signalfd to the epoll set. + struct epoll_event epevent; + epevent.data.ptr = &sigfd; + epevent.events = EPOLLIN; + // No need for EPOLLONESHOT - we can pull the signals out + // as we see them. + if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) { + close(sigfd); + throw new std::system_error(errno, std::system_category()); + } + } + } + + // Note, called with lock held: + void rearmSignalWatch_nolock(int signo) noexcept + { + sigaddset(&sigmask, signo); + signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + } + + void removeSignalWatch_nolock(int signo) noexcept + { + sigdelset(&sigmask, signo); + signalfd(sigfd, &sigmask, 0); + } + + void removeSignalWatch(int signo) noexcept + { + std::lock_guard guard(Base::lock); + removeSignalWatch_nolock(signo); + } + + // If events are pending, process an unspecified number of them. + // If no events are pending, wait until one event is received and + // process this event (and possibly any other events received + // simultaneously). + // If processing an event removes a watch, there is a possibility + // that the watched event will still be reported (if it has + // occurred) before pullEvents() returns. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pullEvents(bool do_wait) + { + epoll_event events[16]; + int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0); + if (r == -1 || r == 0) { + // signal or no events + return; + } + + processEvents(events, r); + } + + // If events are pending, process one of them. + // If no events are pending, wait until one event is received and + // process this event. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pullOneEvent(bool do_wait) + { + epoll_event events[1]; + int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0); + if (r == -1 || r == 0) { + // signal or no events + return; + } + + processEvents(events, r); + } + + // Interrupt any current poll operation (pullEvents/pullOneEvent), causing + // it to to return immediately. + void interruptWait() + { + // TODO + } +}; + +// Map of pid_t to void *, with possibility of reserving entries so that mappings can +// be later added with no danger of allocator exhaustion (bad_alloc). +class pid_map +{ + using pair = std::pair; + std::unordered_map base_map; + std::vector backup_vector; + + // Number of entries in backup_vector that are actually in use (as opposed + // to simply reserved): + int backup_size = 0; + + public: + using entry = std::pair; + + entry get(pid_t key) noexcept + { + auto it = base_map.find(key); + if (it == base_map.end()) { + // Not in map; look in vector + for (int i = 0; i < backup_size; i++) { + if (backup_vector[i].first == key) { + return entry(true, backup_vector[i].second); + } + } + + return entry(false, nullptr); + } + + return entry(true, it->second); + } + + entry erase(pid_t key) noexcept + { + auto iter = base_map.find(key); + if (iter != base_map.end()) { + entry r(true, iter->second); + base_map.erase(iter); + return r; + } + for (int i = 0; i < backup_size; i++) { + if (backup_vector[i].first == key) { + entry r(true, backup_vector[i].second); + backup_vector.erase(backup_vector.begin() + i); + return r; + } + } + return entry(false, nullptr); + } + + // Throws bad_alloc on reservation failure + void reserve() + { + backup_vector.resize(backup_vector.size() + 1); + } + + void add(pid_t key, void *val) // throws std::bad_alloc + { + base_map[key] = val; + } + + void add_from_reserve(pid_t key, void *val) noexcept + { + try { + base_map[key] = val; + backup_vector.resize(backup_vector.size() - 1); + } + catch (std::bad_alloc &) { + // We couldn't add into the map, use the reserve: + backup_vector[backup_size++] = pair(key, val); + } + } +}; + +template class ChildProcEvents : public Base +{ + private: + pid_map child_waiters; + + using SigInfo = typename Base::SigInfo; + + protected: + void receiveSignal(SigInfo &siginfo, void *userdata) + { + if (siginfo.get_signo() == SIGCHLD) { + int status; + pid_t child; + while ((child = waitpid(-1, &status, WNOHANG)) > 0) { + pid_map::entry ent = child_waiters.erase(child); + if (ent.first) { + Base::receiveChildStat(child, status, ent.second); + } + } + } + else { + Base::receiveSignal(siginfo, userdata); + } + } + + public: + void reserveChildWatch() + { + child_waiters.reserve(); + } + + void addChildWatch(pid_t child, void *val) + { + child_waiters.add(child, val); + } + + void addReservedChildWatch(pid_t child, void *val) noexcept + { + child_waiters.add_from_reserve(child, val); + } +}; + +} // end namespace diff --git a/src/dasync/dasync-kqueue.h b/src/dasync/dasync-kqueue.h new file mode 100644 index 0000000..e7b33a4 --- /dev/null +++ b/src/dasync/dasync-kqueue.h @@ -0,0 +1,427 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +namespace dasync { + +// Event type bits +constexpr unsigned int in_events = 1; +constexpr unsigned int out_events = 2; +constexpr unsigned int err_events = 4; + +constexpr unsigned int one_shot = 8; + + +template class KqueueLoop; + +class KqueueTraits +{ + template friend class KqueueLoop; + + public: + + class SigInfo + { + template friend class KqueueLoop; + + siginfo_t info; + + public: + int get_signo() { return info.si_signo; } + int get_sicode() { return info.si_code; } + char * get_ssiaddr() { return info.si_addr; } + + void set_signo(int signo) { info.si_signo = signo; } + }; + + class FD_r; + + // File descriptor optional storage. If the mechanism can return the file descriptor, this + // class will be empty, otherwise it can hold a file descriptor. + class FD_s { + // Epoll doesn't return the file descriptor (it can, but it can't return both file + // descriptor and user data). + // TODO make true empty. + }; + + // File descriptor reference (passed to event callback). If the mechanism can return the + // file descriptor, this class holds the file descriptor. Otherwise, the file descriptor + // must be stored in an FD_s instance. + class FD_r { + int fd; + public: + int getFd(FD_s ss) + { + return fd; + } + }; +}; + + +template class KqueueLoop : public Base +{ + int kqfd; // epoll fd + + // Base contains: + // lock - a lock that can be used to protect internal structure. + // receive*() methods will be called with lock held. + // receiveSignal(SigInfo &, user *) noexcept + // receiveFdEvent(FD_r, user *, int flags) noexcept + + using SigInfo = KqueueTraits::SigInfo; + using FD_r = typename KqueueTraits::FD_r; + + void processEvents(struct kevent *events, int r) + { + std::lock_guard guard(Base::lock); + + for (int i = 0; i < r; i++) { + if (events[i].filter = EVFILT_SIGNAL) { + siginfo_t siginfo; + sigset_t sset; + sigemptyset(&sset); + sigaddset(&sset, events[i].ident); + //int r = sigwaitinfo(&sset, &siginfo); + // OpenBSD doesn't have sigwaitinfo... + int r = 0; + if (r > 0) { + Base::receiveSignal(siginfo, (void *)events[i].udata); + } + } + + + //else { + // int flags = 0; + // (events[i].events & EPOLLIN) && (flags |= in_events); + // (events[i].events & EPOLLHUP) && (flags |= in_events); + // (events[i].events & EPOLLOUT) && (flags |= out_events); + // (events[i].events & EPOLLERR) && (flags |= err_events); + // Base::receiveFdEvent(*this, FD_r(), ptr, flags); + //} + } + } + + public: + + /** + * KqueueLoop constructor. + * + * Throws std::system_error or std::bad_alloc if the event loop cannot be initialised. + */ + KqueueLoop() + { + //epfd = epoll_create1(EPOLL_CLOEXEC); + //if (epfd == -1) { + // throw std::system_error(errno, std::system_category()); + //} + //sigemptyset(&sigmask); + } + + ~KqueueLoop() + { + //close(epfd); + //if (sigfd != -1) { + // close(sigfd); + //} + } + + // flags: in_events | out_events + void addFdWatch(int fd, void *userdata, int flags) + { + //struct epoll_event epevent; + // epevent.data.fd = fd; + //epevent.data.ptr = userdata; + //epevent.events = 0; + + //if (flags & one_shot) { + // epevent.events = EPOLLONESHOT; + //} + //if (flags & in_events) { + // epevent.events |= EPOLLIN; + //} + //if (flags & out_events) { + // epevent.events |= EPOLLOUT; + //} + + //if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &epevent) == -1) { + // throw new std::system_error(errno, std::system_category()); + //} + } + + void removeFdWatch(int fd) + { + //epoll_ctl(epfd, EPOLL_CTL_DEL, fd, nullptr); + } + + void removeFdWatch_nolock(int fd) + { + removeFdWatch(fd); + } + + // Note this will *replace* the old flags with the new, that is, + // it can enable *or disable* read/write events. + void enableFdWatch(int fd, void *userdata, int flags) + { + //struct epoll_event epevent; + // epevent.data.fd = fd; + //epevent.data.ptr = userdata; + //epevent.events = 0; + + //if (flags & one_shot) { + // epevent.events = EPOLLONESHOT; + //} + //if (flags & in_events) { + // epevent.events |= EPOLLIN; + //} + //if (flags & out_events) { + // epevent.events |= EPOLLOUT; + //} + + //if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { + // throw new std::system_error(errno, std::system_category()); + //} + } + + void enableFdWatch_nolock(int fd, void *userdata, int flags) + { + enableFdWatch(fd, userdata, flags); + } + + void disableFdWatch(int fd) + { + //struct epoll_event epevent; + // epevent.data.fd = fd; + //epevent.data.ptr = nullptr; + //epevent.events = 0; + + // Epoll documentation says that hangup will still be reported, need to check + // whether this is really the case. Suspect it is really only the case if + // EPOLLIN is set. + //if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &epevent) == -1) { + // throw new std::system_error(errno, std::system_category()); + //} + } + + // Note signal should be masked before call. + void addSignalWatch(int signo, void *userdata) + { + //std::lock_guard guard(Base::lock); + + //sigdataMap[signo] = userdata; + + // Modify the signal fd to watch the new signal + //bool was_no_sigfd = (sigfd == -1); + //sigaddset(&sigmask, signo); + //sigfd = signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + //if (sigfd == -1) { + // throw new std::system_error(errno, std::system_category()); + //} + + //if (was_no_sigfd) { + // Add the signalfd to the epoll set. + // struct epoll_event epevent; + // epevent.data.ptr = &sigfd; + // epevent.events = EPOLLIN; + // No need for EPOLLONESHOT - we can pull the signals out + // as we see them. + // if (epoll_ctl(epfd, EPOLL_CTL_ADD, sigfd, &epevent) == -1) { + // close(sigfd); + // throw new std::system_error(errno, std::system_category()); + // } + //} + } + + // Note, called with lock held: + void rearmSignalWatch_nolock(int signo) noexcept + { + //sigaddset(&sigmask, signo); + //signalfd(sigfd, &sigmask, SFD_NONBLOCK | SFD_CLOEXEC); + } + + void removeSignalWatch_nolock(int signo) noexcept + { + //sigdelset(&sigmask, signo); + //signalfd(sigfd, &sigmask, 0); + } + + void removeSignalWatch(int signo) noexcept + { + //std::lock_guard guard(Base::lock); + //removeSignalWatch_nolock(signo); + } + + // If events are pending, process an unspecified number of them. + // If no events are pending, wait until one event is received and + // process this event (and possibly any other events received + // simultaneously). + // If processing an event removes a watch, there is a possibility + // that the watched event will still be reported (if it has + // occurred) before pullEvents() returns. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pullEvents(bool do_wait) + { + //epoll_event events[16]; + //int r = epoll_wait(epfd, events, 16, do_wait ? -1 : 0); + //if (r == -1 || r == 0) { + // signal or no events + // return; + //} + + //processEvents(events, r); + } + + // If events are pending, process one of them. + // If no events are pending, wait until one event is received and + // process this event. + // + // do_wait - if false, returns immediately if no events are + // pending. + void pullOneEvent(bool do_wait) + { + //epoll_event events[1]; + //int r = epoll_wait(epfd, events, 1, do_wait ? -1 : 0); + //if (r == -1 || r == 0) { + // signal or no events + // return; + //} + + //processEvents(events, r); + } + + // Interrupt any current poll operation (pullEvents/pullOneEvent), causing + // it to to return immediately. + void interruptWait() + { + // TODO + } +}; + +// Map of pid_t to void *, with possibility of reserving entries so that mappings can +// be later added with no danger of allocator exhaustion (bad_alloc). +class pid_map +{ + using pair = std::pair; + std::unordered_map base_map; + std::vector backup_vector; + + // Number of entries in backup_vector that are actually in use (as opposed + // to simply reserved): + int backup_size = 0; + + public: + using entry = std::pair; + + entry get(pid_t key) noexcept + { + auto it = base_map.find(key); + if (it == base_map.end()) { + // Not in map; look in vector + for (int i = 0; i < backup_size; i++) { + if (backup_vector[i].first == key) { + return entry(true, backup_vector[i].second); + } + } + + return entry(false, nullptr); + } + + return entry(true, it->second); + } + + entry erase(pid_t key) noexcept + { + auto iter = base_map.find(key); + if (iter != base_map.end()) { + entry r(true, iter->second); + base_map.erase(iter); + return r; + } + for (int i = 0; i < backup_size; i++) { + if (backup_vector[i].first == key) { + entry r(true, backup_vector[i].second); + backup_vector.erase(backup_vector.begin() + i); + return r; + } + } + return entry(false, nullptr); + } + + // Throws bad_alloc on reservation failure + void reserve() + { + backup_vector.resize(backup_vector.size() + 1); + } + + void add(pid_t key, void *val) // throws std::bad_alloc + { + base_map[key] = val; + } + + void add_from_reserve(pid_t key, void *val) noexcept + { + try { + base_map[key] = val; + backup_vector.resize(backup_vector.size() - 1); + } + catch (std::bad_alloc &) { + // We couldn't add into the map, use the reserve: + backup_vector[backup_size++] = pair(key, val); + } + } +}; + +template class ChildProcEvents : public Base +{ + private: + pid_map child_waiters; + + using SigInfo = typename Base::SigInfo; + + protected: + void receiveSignal(SigInfo &siginfo, void *userdata) + { + if (siginfo.get_signo() == SIGCHLD) { + int status; + pid_t child; + while ((child = waitpid(-1, &status, WNOHANG)) > 0) { + pid_map::entry ent = child_waiters.erase(child); + if (ent.first) { + Base::receiveChildStat(child, status, ent.second); + } + } + } + else { + Base::receiveSignal(siginfo, userdata); + } + } + + public: + void reserveChildWatch() + { + child_waiters.reserve(); + } + + void addChildWatch(pid_t child, void *val) + { + child_waiters.add(child, val); + } + + void addReservedChildWatch(pid_t child, void *val) noexcept + { + child_waiters.add_from_reserve(child, val); + } +}; + +} // end namespace diff --git a/src/dasync/dasync.h b/src/dasync/dasync.h index d9008d7..e3e8b51 100644 --- a/src/dasync/dasync.h +++ b/src/dasync/dasync.h @@ -1,7 +1,19 @@ #ifndef DASYNC_H_INCLUDED #define DASYNC_H_INCLUDED -#include "dasync-aen.h" +#if defined(__OpenBSD__) +#define HAVE_KQUEUE 1 +#endif + +#if defined(__linux__) +#define HAVE_EPOLL 1 +#endif + +#if defined(HAVE_KQUEUE) +#include "dasync-kqueue.h" +#elif defined(HAVE_EPOLL) +#include "dasync-epoll.h" +#endif #include #include