#include <unistd.h>
-#include "dasync.h"
+#include "dasynq.h"
#include "dinit-log.h"
#include "control-cmds.h"
// Control connection for dinit
-using namespace dasync;
-using EventLoop_t = EventLoop<NullMutex>;
+using namespace dasynq;
+using eventloop_t = event_loop<null_mutex>;
-class ControlConn;
-class ControlConnWatcher;
+class control_conn_t;
+class control_conn_watcher;
// forward-declaration of callback:
-static dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
+static rearm control_conn_cb(eventloop_t *loop, control_conn_watcher *watcher, int revents);
// Pointer to the control connection that is listening for rollback completion
-extern ControlConn * rollback_handler_conn;
+extern control_conn_t * rollback_handler_conn;
extern int active_control_conns;
// (1 byte) packet length (including all fields)
// N bytes: packet data (N = (length - 2))
-class ServiceSet;
-class ServiceRecord;
+class service_set;
+class service_record;
-class ControlConnWatcher : public PosixBidiFdWatcher<NullMutex>
+class control_conn_watcher : public eventloop_t::bidi_fd_watcher_impl<control_conn_watcher>
{
- inline Rearm receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept;
+ inline rearm receive_event(eventloop_t &loop, int fd, int flags) noexcept;
- Rearm readReady(EventLoop_t * loop, int fd) noexcept override
+ eventloop_t * event_loop;
+
+ public:
+ control_conn_watcher(eventloop_t & event_loop_p) : event_loop(&event_loop_p)
{
- return receiveEvent(loop, fd, in_events);
+ // constructor
}
-
- Rearm writeReady(EventLoop_t * loop, int fd) noexcept override
+
+ rearm read_ready(eventloop_t &loop, int fd) noexcept
{
- return receiveEvent(loop, fd, out_events);
+ return receive_event(loop, fd, IN_EVENTS);
}
- public:
- int fd; // TODO this is already stored, find a better way to access it.
- EventLoop_t * eventLoop;
-
- void setWatchFlags(int flags)
+ rearm write_ready(eventloop_t &loop, int fd) noexcept
{
- PosixBidiFdWatcher<NullMutex>::setWatchFlags(eventLoop, flags);
+ return receive_event(loop, fd, OUT_EVENTS);
}
-
- void registerWith(EventLoop_t *loop, int fd, int flags)
+
+ void set_watches(int flags)
{
- this->fd = fd;
- this->eventLoop = loop;
- PosixBidiFdWatcher<NullMutex>::registerWith(loop, fd, flags);
+ eventloop_t::bidi_fd_watcher::set_watches(*event_loop, flags);
}
};
-inline Rearm ControlConnWatcher::receiveEvent(EventLoop_t * loop, int fd, int flags) noexcept
+inline rearm control_conn_watcher::receive_event(eventloop_t &loop, int fd, int flags) noexcept
{
- return control_conn_cb(loop, this, flags);
+ return control_conn_cb(&loop, this, flags);
}
-class ControlConn : private ServiceListener
+class control_conn_t : private service_listener
{
- friend dasync::Rearm control_conn_cb(EventLoop_t *loop, ControlConnWatcher *watcher, int revents);
+ friend rearm control_conn_cb(eventloop_t *loop, control_conn_watcher *watcher, int revents);
- ControlConnWatcher iob;
- EventLoop_t *loop;
- ServiceSet *service_set;
+ control_conn_watcher iob;
+ eventloop_t &loop;
+ service_set *services;
bool bad_conn_close = false; // close when finished output?
bool oom_close = false; // send final 'out of memory' indicator
// The packet length before we need to re-check if the packet is complete.
- // processPacket() will not be called until the packet reaches this size.
+ // process_packet() will not be called until the packet reaches this size.
int chklen;
// Receive buffer
- CPBuffer<1024> rbuf;
+ cpbuffer<1024> rbuf;
template <typename T> using list = std::list<T>;
template <typename T> using vector = std::vector<T>;
// A mapping between service records and their associated numerical identifier used
// in communction
using handle_t = uint32_t;
- std::unordered_multimap<ServiceRecord *, handle_t> serviceKeyMap;
- std::unordered_map<handle_t, ServiceRecord *> keyServiceMap;
+ std::unordered_multimap<service_record *, handle_t> serviceKeyMap;
+ std::unordered_map<handle_t, service_record *> keyServiceMap;
// Buffer for outgoing packets. Each outgoing back is represented as a vector<char>.
list<vector<char>> outbuf;
unsigned outpkt_index = 0;
// Queue a packet to be sent
- // Returns: false if the packet could not be queued and the connection has been
- // destroyed;
+ // Returns: false if the packet could not be queued and a suitable error packet
+ // could not be sent/queued (the connection should be closed);
// true (with bad_conn_close == false) if the packet was successfully
// queued;
// true (with bad_conn_close == true) if the packet was not successfully
- // queued.
+ // queued (but a suitable error packate has been queued).
// The in/out watch enabled state will also be set appropriately.
- bool queuePacket(vector<char> &&v) noexcept;
- bool queuePacket(const char *pkt, unsigned size) noexcept;
-
- // Process a packet. Can cause the ControlConn to be deleted iff there are no
- // outgoing packets queued (returns false).
+ bool queue_packet(vector<char> &&v) noexcept;
+ bool queue_packet(const char *pkt, unsigned size) noexcept;
+
+ // Process a packet.
+ // Returns: true (with bad_conn_close == false) if successful
+ // true (with bad_conn_close == true) if an error packet was queued
+ // false if an error occurred but no error packet could be queued
+ // (connection should be closed).
// Throws:
// std::bad_alloc - if an out-of-memory condition prevents processing
- bool processPacket();
+ bool process_packet();
// Process a STARTSERVICE/STOPSERVICE packet. May throw std::bad_alloc.
- bool processStartStop(int pktType);
+ bool process_start_stop(int pktType);
// Process a FINDSERVICE/LOADSERVICE packet. May throw std::bad_alloc.
- bool processFindLoad(int pktType);
+ bool process_find_load(int pktType);
// Process an UNPINSERVICE packet. May throw std::bad_alloc.
- bool processUnpinService();
+ bool process_unpin_service();
+
+ bool list_services();
- // Notify that data is ready to be read from the socket. Returns true if the connection was
- // deleted.
- bool dataReady() noexcept;
+ // Notify that data is ready to be read from the socket. Returns true if the connection should
+ // be closed.
+ bool data_ready() noexcept;
- bool sendData() noexcept;
+ bool send_data() noexcept;
// Allocate a new handle for a service; may throw std::bad_alloc
- handle_t allocateServiceHandle(ServiceRecord *record);
+ handle_t allocate_service_handle(service_record *record);
- ServiceRecord *findServiceForKey(uint32_t key)
+ service_record *find_service_for_key(uint32_t key)
{
try {
return keyServiceMap.at(key);
}
// Close connection due to out-of-memory condition.
- void doOomClose()
+ void do_oom_close()
{
bad_conn_close = true;
oom_close = true;
- iob.setWatchFlags(out_events);
+ iob.set_watches(OUT_EVENTS);
}
// Process service event broadcast.
- void serviceEvent(ServiceRecord * service, ServiceEvent event) noexcept final override
+ // Note that this can potentially be called during packet processing (upon issuing
+ // service start/stop orders etc).
+ void service_event(service_record * service, service_event_t event) noexcept final override
{
// For each service handle corresponding to the event, send an information packet.
auto range = serviceKeyMap.equal_range(service);
pkt.push_back(*p++);
}
pkt.push_back(static_cast<char>(event));
- queuePacket(std::move(pkt));
+ queue_packet(std::move(pkt));
++i;
}
}
catch (std::bad_alloc &exc) {
- doOomClose();
+ do_oom_close();
}
}
public:
- ControlConn(EventLoop_t * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0)
+ control_conn_t(eventloop_t &loop, service_set * services_p, int fd)
+ : iob(loop), loop(loop), services(services_p), chklen(0)
{
- iob.registerWith(loop, fd, in_events);
+ iob.add_watch(loop, fd, IN_EVENTS);
active_control_conns++;
}
- bool rollbackComplete() noexcept;
+ bool rollback_complete() noexcept;
- virtual ~ControlConn() noexcept;
+ virtual ~control_conn_t() noexcept;
};
-static dasync::Rearm control_conn_cb(EventLoop_t * loop, ControlConnWatcher * watcher, int revents)
+static rearm control_conn_cb(eventloop_t * loop, control_conn_watcher * watcher, int revents)
{
- using Rearm = dasync::Rearm;
-
- char * cc_addr = (reinterpret_cast<char *>(watcher)) - offsetof(ControlConn, iob);
- ControlConn *conn = reinterpret_cast<ControlConn *>(cc_addr);
- if (revents & in_events) {
- if (conn->dataReady()) {
- // ControlConn was deleted
- return Rearm::REMOVED;
+ char * cc_addr = (reinterpret_cast<char *>(watcher)) - offsetof(control_conn_t, iob);
+ control_conn_t *conn = reinterpret_cast<control_conn_t *>(cc_addr);
+ if (revents & IN_EVENTS) {
+ if (conn->data_ready()) {
+ delete conn;
+ return rearm::REMOVED;
}
}
- if (revents & out_events) {
- if (conn->sendData()) {
- return Rearm::REMOVED;
+ if (revents & OUT_EVENTS) {
+ if (conn->send_data()) {
+ delete conn;
+ return rearm::REMOVED;
}
}
- return Rearm::NOOP;
+ return rearm::NOOP;
}
#endif