// Requests:
-// Start or stop a named service:
-constexpr static int DINIT_CP_STARTSERVICE = 0;
-constexpr static int DINIT_CP_STOPSERVICE = 1;
+// Query protocol version:
+constexpr static int DINIT_CP_QUERYVERSION = 0;
+
+// Find (but don't load) a service:
+constexpr static int DINIT_CP_FINDSERVICE = 1;
+
+// Find or load a service:
+constexpr static int DINIT_CP_LOADSERVICE = 2;
+
+// Start or stop a service:
+constexpr static int DINIT_CP_STARTSERVICE = 3;
+constexpr static int DINIT_CP_STOPSERVICE = 4;
// Roll-back all services:
-constexpr static int DINIT_CP_ROLLBACKALL = 2;
+constexpr static int DINIT_CP_ROLLBACKALL = 5;
-// Query protocol version:
-constexpr static int DINIT_CP_QUERYVERSION = 3;
// Replies:
// Connection being closed due to out-of-memory condition
constexpr static int DINIT_RP_OOM = 53;
-// Reply: rollback completed
-constexpr static int DINIT_ROLLBACK_COMPLETED = 60;
-
// Start service replies:
-constexpr static int DINIT_RP_SERVICELOADERR = 61;
-constexpr static int DINIT_RP_SERVICEOOM = 62; // couldn't start due to out-of-memory
+constexpr static int DINIT_RP_SERVICELOADERR = 54;
+constexpr static int DINIT_RP_SERVICEOOM = 55; // couldn't start due to out-of-memory
+
+constexpr static int DINIT_RP_SSISSUED = 56; // service start/stop was issued (includes 4-byte service handle)
+constexpr static int DINIT_RP_SSREDUNDANT = 57; // service was already started/stopped (or for stop, not loaded)
// Query version response:
-constexpr static int DINIT_RP_CPVERSION = 63;
+constexpr static int DINIT_RP_CPVERSION = 58;
+
+// Service record loaded/found
+constexpr static int DINIT_RP_SERVICERECORD = 59;
+// followed by 4-byte service handle, 1-byte service state
+
+// Couldn't find/load service
+constexpr static int DINIT_RP_NOSERVICE = 60;
+
+
+
+// Information:
+
+// Service event occurred (4-byte service handle, 1 byte event code)
+constexpr static int DINIT_IP_SERVICEEVENT = 100;
+
+// rollback completed
+constexpr static int DINIT_ROLLBACK_COMPLETED = 101;
bufidx -= 1;
return;
}
+ if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
+ processFindLoad(pktType);
+ return;
+ }
if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE) {
- if (bufidx < 4) {
- chklen = 4;
- return;
- }
-
- uint16_t svcSize;
- memcpy(&svcSize, iobuf + 1, 2);
- chklen = svcSize + 3;
- if (svcSize <= 0 || chklen > 1024) {
- // Queue error response mark connection bad
- char badreqRep[] = { DINIT_RP_BADREQ };
- if (! queuePacket(badreqRep, 1)) return;
- bad_conn_close = true;
- ev_io_set(&iob, iob.fd, EV_WRITE);
- return;
- }
-
- if (bufidx < chklen) {
- // packet not complete yet; read more
- return;
- }
-
- string serviceName(iobuf + 3, (size_t) svcSize);
- if (pktType == DINIT_CP_STARTSERVICE) {
- // TODO do not allow services to be started during system shutdown
- try {
- char ack_buf[] = { DINIT_RP_ACK };
- service_set->startService(serviceName.c_str());
- if (! queuePacket(ack_buf, 1)) return;
- }
- catch (ServiceLoadExc &slexc) {
- log(LogLevel::ERROR, "Could not start service ", slexc.serviceName, ": ", slexc.excDescription);
- char outbuf[] = { DINIT_RP_SERVICELOADERR };
- if (! queuePacket(outbuf, 1)) return;
- }
- catch (std::bad_alloc &baexc) {
- char outbuf[] = { DINIT_RP_SERVICEOOM };
- if (! queuePacket(outbuf, 1)) return; // might degenerate to DINIT_RP_OOM, which is fine.
- }
- }
- else {
- // TODO verify the named service exists?
- service_set->stopService(serviceName.c_str());
- }
-
- // Clear the packet from the buffer
- memmove(iobuf, iobuf + chklen, 1024 - chklen);
- bufidx -= chklen;
- chklen = 0;
+ processStartStop(pktType);
return;
}
else if (pktType == DINIT_CP_ROLLBACKALL) {
return;
}
+void ControlConn::processFindLoad(int pktType)
+{
+ using std::string;
+
+ constexpr int pkt_size = 4;
+
+ if (bufidx < pkt_size) {
+ chklen = pkt_size;
+ return;
+ }
+
+ uint16_t svcSize;
+ memcpy(&svcSize, iobuf + 1, 2);
+ chklen = svcSize + 3;
+ if (svcSize <= 0 || chklen > 1024) {
+ // Queue error response / mark connection bad
+ char badreqRep[] = { DINIT_RP_BADREQ };
+ if (! queuePacket(badreqRep, 1)) return;
+ bad_conn_close = true;
+ ev_io_set(&iob, iob.fd, EV_WRITE);
+ return;
+ }
+
+ if (bufidx < chklen) {
+ // packet not complete yet; read more
+ return;
+ }
+
+ ServiceRecord * record = nullptr;
+
+ string serviceName(iobuf + 3, (size_t) svcSize);
+ if (pktType == DINIT_CP_LOADSERVICE) {
+ // LOADSERVICE
+ try {
+ record = service_set->loadService(serviceName);
+ }
+ catch (ServiceLoadExc &slexc) {
+ log(LogLevel::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
+ }
+ }
+ else {
+ // FINDSERVICE
+ record = service_set->findService(serviceName.c_str());
+ }
+
+ if (record != nullptr) {
+ // Allocate a service handle
+ handle_t handle = allocateServiceHandle(record);
+ std::vector<char> rp_buf;
+ rp_buf.reserve(6);
+ rp_buf.push_back(DINIT_RP_SERVICERECORD);
+ rp_buf.push_back(static_cast<char>(record->getState()));
+ for (int i = 0; i < (int) sizeof(handle); i++) {
+ rp_buf.push_back(*(((char *) &handle) + i));
+ }
+ if (! queuePacket(std::move(rp_buf))) return;
+ }
+ else {
+ std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
+ if (! queuePacket(std::move(rp_buf))) return;
+ }
+
+ // Clear the packet from the buffer
+ memmove(iobuf, iobuf + chklen, 1024 - chklen);
+ bufidx -= chklen;
+ chklen = 0;
+ return;
+}
+
+void ControlConn::processStartStop(int pktType)
+{
+ using std::string;
+
+ constexpr int pkt_size = 2 + sizeof(handle_t);
+
+ if (bufidx < pkt_size) {
+ chklen = pkt_size;
+ return;
+ }
+
+ // 1 byte: packet type
+ // 1 byte: pin in requested state (0 = no pin, 1 = pin)
+ // 4 bytes: service handle
+
+ bool do_pin = (iobuf[1] == 1);
+ handle_t handle;
+ memcpy(&handle, iobuf + 2, sizeof(handle));
+
+ ServiceRecord *service = findServiceForKey(handle);
+ if (service == nullptr) {
+ // Service handle is bad
+ char badreqRep[] = { DINIT_RP_BADREQ };
+ if (! queuePacket(badreqRep, 1)) return;
+ bad_conn_close = true;
+ ev_io_set(&iob, iob.fd, EV_WRITE);
+ return;
+ }
+ else {
+ if (pktType == DINIT_CP_STARTSERVICE) {
+ if (do_pin) {
+ service->pinStart();
+ }
+ else {
+ service->start();
+ }
+ }
+ else {
+ if (do_pin) {
+ service->pinStop();
+ }
+ else {
+ service->stop();
+ }
+ }
+
+ char ack_buf[] = { DINIT_RP_ACK };
+ if (! queuePacket(ack_buf, 1)) return;
+ }
+
+ // Clear the packet from the buffer
+ memmove(iobuf, iobuf + pkt_size, 1024 - pkt_size);
+ bufidx -= pkt_size;
+ chklen = 0;
+ return;
+}
+
+ControlConn::handle_t ControlConn::allocateServiceHandle(ServiceRecord *record)
+{
+ bool is_unique = true;
+ handle_t largest_seen = 0;
+ handle_t candidate = 0;
+ for (auto p : keyServiceMap) {
+ if (p.first > largest_seen) largest_seen = p.first;
+ if (p.first == candidate) {
+ if (largest_seen == std::numeric_limits<handle_t>::max()) throw std::bad_alloc();
+ candidate = largest_seen + 1;
+ }
+ is_unique &= (p.second != record);
+ }
+
+ keyServiceMap[candidate] = record;
+ serviceKeyMap.insert(std::make_pair(record, candidate));
+
+ if (is_unique) {
+ record->addListener(this);
+ }
+
+ return candidate;
+}
+
+
bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
{
if (bad_conn_close) return false;
bool ControlConn::rollbackComplete() noexcept
{
- char ackBuf[1] = { DINIT_ROLLBACK_COMPLETED };
- return queuePacket(ackBuf, 1);
+ char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
+ return queuePacket(ackBuf, 2);
}
bool ControlConn::dataReady() noexcept
processPacket();
}
catch (std::bad_alloc &baexc) {
- // TODO
+ doOomClose();
}
}
close(iob.fd);
ev_io_stop(loop, &iob);
delete [] iobuf;
+
+ // Clear service listeners
+ for (auto p : serviceKeyMap) {
+ p.first->removeListener(this);
+ }
+
service_set->clearRollbackHandler(this);
active_control_conns--;
}
#include <list>
#include <vector>
+#include <unordered_map>
+#include <limits>
#include <unistd.h>
#include <ev++.h>
#include "dinit-log.h"
#include "control-cmds.h"
+#include "service-listener.h"
// Control connection for dinit
// "packet" format:
// (1 byte) packet type
// (N bytes) additional data (service name, etc)
-// for STARTSERVICE/STOPSERVICE:
+// for LOADSERVICE/FINDSERVICE:
// (2 bytes) service name length
-// (M buyes) service name (without nul terminator)
+// (M bytes) service name (without nul terminator)
-class ServiceSet;
+// Information packet:
+// (1 byte) packet type, >= 100
+// (1 byte) packet length (including all fields)
+// N bytes: packet data (N = (length - 2))
+class ServiceSet;
+class ServiceRecord;
-class ControlConn
+class ControlConn : private ServiceListener
{
friend void control_conn_cb(struct ev_loop *, ev_io *, int);
template <typename T> using list = std::list<T>;
template <typename T> using vector = std::vector<T>;
+ // A mapping between service records and their associated numerical identifier used
+ // in communction
+ using handle_t = uint32_t;
+ std::unordered_multimap<ServiceRecord *, handle_t> serviceKeyMap;
+ std::unordered_map<handle_t, ServiceRecord *> keyServiceMap;
+
// Buffer for outgoing packets. Each outgoing back is represented as a vector<char>.
list<vector<char>> outbuf;
// Current index within the first outgoing packet (all previous bytes have been sent).
// Process a packet. Can cause the ControlConn to be deleted iff there are no
// outgoing packets queued.
+ // Throws:
+ // std::bad_alloc - if an out-of-memory condition prevents processing
void processPacket();
+
+ void processStartStop(int pktType);
+
+ void processFindLoad(int pktType);
// Notify that data is ready to be read from the socket. Returns true in cases where the
// connection was deleted with potentially pending outgoing packets.
void sendData() noexcept;
+ // Allocate a new handle for a service; may throw std::bad_alloc
+ handle_t allocateServiceHandle(ServiceRecord *record);
+
+ ServiceRecord *findServiceForKey(uint32_t key)
+ {
+ try {
+ return keyServiceMap.at(key);
+ }
+ catch (std::out_of_range &exc) {
+ return nullptr;
+ }
+ }
+
+ // Close connection due to out-of-memory condition.
+ void doOomClose()
+ {
+ bad_conn_close = true;
+ oom_close = true;
+ ev_io_set(&iob, iob.fd, EV_WRITE);
+ }
+
+ // Process service event broadcast.
+ void serviceEvent(ServiceRecord * service, ServiceEvent event) noexcept final override
+ {
+ // For each service handle corresponding to the event, send an information packet.
+ auto range = serviceKeyMap.equal_range(service);
+ auto & i = range.first;
+ auto & end = range.second;
+ try {
+ while (i != end) {
+ uint32_t key = i->second;
+ std::vector<char> pkt;
+ constexpr int pktsize = 3 + sizeof(key);
+ pkt.reserve(pktsize);
+ pkt.push_back(DINIT_IP_SERVICEEVENT);
+ pkt.push_back(pktsize);
+ char * p = (char *) &key;
+ for (int j = 0; j < (int)sizeof(key); j++) {
+ pkt.push_back(*p++);
+ }
+ pkt.push_back(static_cast<char>(event));
+ queuePacket(std::move(pkt));
+ ++i;
+ }
+ }
+ catch (std::bad_alloc &exc) {
+ doOomClose();
+ }
+ }
+
public:
- ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), bufidx(0), chklen(0)
+ ControlConn(struct ev_loop * loop, ServiceSet * service_set, int fd) : loop(loop), service_set(service_set), chklen(0), bufidx(0)
{
iobuf = new char[1024];
bool rollbackComplete() noexcept;
- ~ControlConn() noexcept;
+ virtual ~ControlConn() noexcept;
};