4 bool ControlConn::processPacket()
8 // Note that where we call queuePacket, we must generally check the return value. If it
9 // returns false it has either deleted the connection or marked it for deletion; we
10 // shouldn't touch instance members after that point.
12 int pktType = rbuf[0];
13 if (pktType == DINIT_CP_QUERYVERSION) {
15 // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
16 char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
17 if (! queuePacket(replyBuf, 1)) return false;
21 if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
22 return processFindLoad(pktType);
24 if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
25 || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
26 return processStartStop(pktType);
28 if (pktType == DINIT_CP_UNPINSERVICE) {
29 return processUnpinService();
31 if (pktType == DINIT_CP_SHUTDOWN) {
33 if (rbuf.get_length() < 2) {
38 auto sd_type = static_cast<ShutdownType>(rbuf[1]);
40 service_set->stop_all_services(sd_type);
41 char ackBuf[] = { DINIT_RP_ACK };
42 if (! queuePacket(ackBuf, 1)) return false;
44 // Clear the packet from the buffer
50 // Unrecognized: give error response
51 char outbuf[] = { DINIT_RP_BADREQ };
52 if (! queuePacket(outbuf, 1)) return false;
53 bad_conn_close = true;
54 iob.setWatches(OUT_EVENTS);
59 bool ControlConn::processFindLoad(int pktType)
63 constexpr int pkt_size = 4;
65 if (rbuf.get_length() < pkt_size) {
71 rbuf.extract((char *)&svcSize, 1, 2);
72 chklen = svcSize + 3; // packet type + (2 byte) length + service name
73 if (svcSize <= 0 || chklen > 1024) {
74 // Queue error response / mark connection bad
75 char badreqRep[] = { DINIT_RP_BADREQ };
76 if (! queuePacket(badreqRep, 1)) return false;
77 bad_conn_close = true;
78 iob.setWatches(OUT_EVENTS);
82 if (rbuf.get_length() < chklen) {
83 // packet not complete yet; read more
87 ServiceRecord * record = nullptr;
89 string serviceName = rbuf.extract_string(3, svcSize);
91 if (pktType == DINIT_CP_LOADSERVICE) {
94 record = service_set->loadService(serviceName);
96 catch (ServiceLoadExc &slexc) {
97 log(LogLevel::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
102 record = service_set->findService(serviceName.c_str());
105 if (record != nullptr) {
106 // Allocate a service handle
107 handle_t handle = allocateServiceHandle(record);
108 std::vector<char> rp_buf;
110 rp_buf.push_back(DINIT_RP_SERVICERECORD);
111 rp_buf.push_back(static_cast<char>(record->getState()));
112 for (int i = 0; i < (int) sizeof(handle); i++) {
113 rp_buf.push_back(*(((char *) &handle) + i));
115 rp_buf.push_back(static_cast<char>(record->getTargetState()));
116 if (! queuePacket(std::move(rp_buf))) return false;
119 std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
120 if (! queuePacket(std::move(rp_buf))) return false;
123 // Clear the packet from the buffer
124 rbuf.consume(chklen);
129 bool ControlConn::processStartStop(int pktType)
133 constexpr int pkt_size = 2 + sizeof(handle_t);
135 if (rbuf.get_length() < pkt_size) {
140 // 1 byte: packet type
141 // 1 byte: pin in requested state (0 = no pin, 1 = pin)
142 // 4 bytes: service handle
144 bool do_pin = (rbuf[1] == 1);
146 rbuf.extract((char *) &handle, 2, sizeof(handle));
148 ServiceRecord *service = findServiceForKey(handle);
149 if (service == nullptr) {
150 // Service handle is bad
151 char badreqRep[] = { DINIT_RP_BADREQ };
152 if (! queuePacket(badreqRep, 1)) return false;
153 bad_conn_close = true;
154 iob.setWatches(OUT_EVENTS);
158 bool already_there = false;
161 case DINIT_CP_STARTSERVICE:
162 // start service, mark as required
163 if (do_pin) service->pinStart();
165 service_set->processQueues(true);
166 already_there = service->getState() == ServiceState::STARTED;
168 case DINIT_CP_STOPSERVICE:
169 // force service to stop
170 if (do_pin) service->pinStop();
172 service->forceStop();
173 service_set->processQueues(false);
174 already_there = service->getState() == ServiceState::STOPPED;
176 case DINIT_CP_WAKESERVICE:
177 // re-start a stopped service (do not mark as required)
178 if (do_pin) service->pinStart();
179 service->start(false);
180 service_set->processQueues(true);
181 already_there = service->getState() == ServiceState::STARTED;
183 case DINIT_CP_RELEASESERVICE:
184 // remove required mark, stop if not required by dependents
185 if (do_pin) service->pinStop();
187 service_set->processQueues(false);
188 already_there = service->getState() == ServiceState::STOPPED;
192 char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
194 if (! queuePacket(ack_buf, 1)) return false;
197 // Clear the packet from the buffer
198 rbuf.consume(pkt_size);
203 bool ControlConn::processUnpinService()
207 constexpr int pkt_size = 1 + sizeof(handle_t);
209 if (rbuf.get_length() < pkt_size) {
214 // 1 byte: packet type
215 // 4 bytes: service handle
218 rbuf.extract((char *) &handle, 1, sizeof(handle));
220 ServiceRecord *service = findServiceForKey(handle);
221 if (service == nullptr) {
222 // Service handle is bad
223 char badreqRep[] = { DINIT_RP_BADREQ };
224 if (! queuePacket(badreqRep, 1)) return false;
225 bad_conn_close = true;
226 iob.setWatches(OUT_EVENTS);
231 service_set->processQueues(true);
232 char ack_buf[] = { (char) DINIT_RP_ACK };
233 if (! queuePacket(ack_buf, 1)) return false;
236 // Clear the packet from the buffer
237 rbuf.consume(pkt_size);
242 ControlConn::handle_t ControlConn::allocateServiceHandle(ServiceRecord *record)
244 bool is_unique = true;
245 handle_t largest_seen = 0;
246 handle_t candidate = 0;
247 for (auto p : keyServiceMap) {
248 if (p.first > largest_seen) largest_seen = p.first;
249 if (p.first == candidate) {
250 if (largest_seen == std::numeric_limits<handle_t>::max()) throw std::bad_alloc();
251 candidate = largest_seen + 1;
253 is_unique &= (p.second != record);
256 keyServiceMap[candidate] = record;
257 serviceKeyMap.insert(std::make_pair(record, candidate));
260 record->addListener(this);
267 bool ControlConn::queuePacket(const char *pkt, unsigned size) noexcept
269 int in_flag = bad_conn_close ? 0 : IN_EVENTS;
270 bool was_empty = outbuf.empty();
272 // If the queue is empty, we can try to write the packet out now rather than queueing it.
273 // If the write is unsuccessful or partial, we queue the remainder.
275 int wr = write(iob.fd, pkt, size);
277 if (errno == EPIPE) {
280 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
281 log(LogLevel::WARN, "Error writing to control connection: ", strerror(errno));
284 // EAGAIN etc: fall through to below
287 if ((unsigned)wr == size) {
289 iob.setWatches(in_flag);
297 // Create a vector out of the (remaining part of the) packet:
299 outbuf.emplace_back(pkt, pkt + size);
300 iob.setWatches(in_flag | OUT_EVENTS);
303 catch (std::bad_alloc &baexc) {
304 // Mark the connection bad, and stop reading further requests
305 bad_conn_close = true;
308 // We can't send out-of-memory response as we already wrote as much as we
309 // could above. Neither can we later send the response since we have currently
310 // sent an incomplete packet. All we can do is close the connection.
314 iob.setWatches(OUT_EVENTS);
320 // This queuePacket method is frustratingly similar to the one above, but the subtle differences
321 // make them extraordinary difficult to combine into a single method.
322 bool ControlConn::queuePacket(std::vector<char> &&pkt) noexcept
324 int in_flag = bad_conn_close ? 0 : IN_EVENTS;
325 bool was_empty = outbuf.empty();
329 // We can try sending the packet immediately:
330 int wr = write(iob.fd, pkt.data(), pkt.size());
332 if (errno == EPIPE) {
335 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
336 log(LogLevel::WARN, "Error writing to control connection: ", strerror(errno));
339 // EAGAIN etc: fall through to below
342 if ((unsigned)wr == pkt.size()) {
344 iob.setWatches(in_flag);
352 outbuf.emplace_back(pkt);
353 iob.setWatches(in_flag | OUT_EVENTS);
356 catch (std::bad_alloc &baexc) {
357 // Mark the connection bad, and stop reading further requests
358 bad_conn_close = true;
361 // We can't send out-of-memory response as we already wrote as much as we
362 // could above. Neither can we later send the response since we have currently
363 // sent an incomplete packet. All we can do is close the connection.
367 iob.setWatches(OUT_EVENTS);
373 bool ControlConn::rollbackComplete() noexcept
375 char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
376 return queuePacket(ackBuf, 2);
379 bool ControlConn::dataReady() noexcept
383 int r = rbuf.fill(fd);
385 // Note file descriptor is non-blocking
387 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
388 log(LogLevel::WARN, "Error writing to control connection: ", strerror(errno));
399 if (rbuf.get_length() >= chklen) {
401 return !processPacket();
403 catch (std::bad_alloc &baexc) {
408 else if (rbuf.get_length() == 1024) {
410 log(LogLevel::WARN, "Received too-large control package; dropping connection");
411 bad_conn_close = true;
412 iob.setWatches(OUT_EVENTS);
415 int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
416 iob.setWatches(IN_EVENTS | out_flags);
422 bool ControlConn::sendData() noexcept
424 if (outbuf.empty() && bad_conn_close) {
427 char oomBuf[] = { DINIT_RP_OOM };
428 write(iob.fd, oomBuf, 1);
433 vector<char> & pkt = outbuf.front();
434 char *data = pkt.data();
435 int written = write(iob.fd, data + outpkt_index, pkt.size() - outpkt_index);
437 if (errno == EPIPE) {
441 else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
442 // spurious readiness notification?
445 log(LogLevel::ERROR, "Error writing to control connection: ", strerror(errno));
451 outpkt_index += written;
452 if (outpkt_index == pkt.size()) {
453 // We've finished this packet, move on to the next:
456 if (outbuf.empty() && ! oom_close) {
457 if (! bad_conn_close) {
458 iob.setWatches(IN_EVENTS);
469 ControlConn::~ControlConn() noexcept
472 iob.deregister(*loop);
474 // Clear service listeners
475 for (auto p : serviceKeyMap) {
476 p.first->removeListener(this);
479 active_control_conns--;