4 bool control_conn_t::process_packet()
8 // Note that where we call queue_packet, we must generally check the return value. If it
9 // returns false it has either deleted the connection or marked it for deletion; we
10 // shouldn't touch instance members after that point.
12 int pktType = rbuf[0];
13 if (pktType == DINIT_CP_QUERYVERSION) {
15 // DINIT_RP_CVERSION, (2 byte) minimum compatible version, (2 byte) maximum compatible version
16 char replyBuf[] = { DINIT_RP_CPVERSION, 0, 0, 0, 0 };
17 if (! queue_packet(replyBuf, 1)) return false;
21 if (pktType == DINIT_CP_FINDSERVICE || pktType == DINIT_CP_LOADSERVICE) {
22 return process_find_load(pktType);
24 if (pktType == DINIT_CP_STARTSERVICE || pktType == DINIT_CP_STOPSERVICE
25 || pktType == DINIT_CP_WAKESERVICE || pktType == DINIT_CP_RELEASESERVICE) {
26 return process_start_stop(pktType);
28 if (pktType == DINIT_CP_UNPINSERVICE) {
29 return process_unpin_service();
31 if (pktType == DINIT_CP_SHUTDOWN) {
33 if (rbuf.get_length() < 2) {
38 auto sd_type = static_cast<shutdown_type_t>(rbuf[1]);
40 services->stop_all_services(sd_type);
41 char ackBuf[] = { DINIT_RP_ACK };
42 if (! queue_packet(ackBuf, 1)) return false;
44 // Clear the packet from the buffer
49 if (pktType == DINIT_CP_LISTSERVICES) {
50 return list_services();
53 // Unrecognized: give error response
54 char outbuf[] = { DINIT_RP_BADREQ };
55 if (! queue_packet(outbuf, 1)) return false;
56 bad_conn_close = true;
57 iob.set_watches(OUT_EVENTS);
62 bool control_conn_t::process_find_load(int pktType)
66 constexpr int pkt_size = 4;
68 if (rbuf.get_length() < pkt_size) {
74 rbuf.extract((char *)&svcSize, 1, 2);
75 chklen = svcSize + 3; // packet type + (2 byte) length + service name
76 if (svcSize <= 0 || chklen > 1024) {
77 // Queue error response / mark connection bad
78 char badreqRep[] = { DINIT_RP_BADREQ };
79 if (! queue_packet(badreqRep, 1)) return false;
80 bad_conn_close = true;
81 iob.set_watches(OUT_EVENTS);
85 if (rbuf.get_length() < chklen) {
86 // packet not complete yet; read more
90 service_record * record = nullptr;
92 string serviceName = rbuf.extract_string(3, svcSize);
94 if (pktType == DINIT_CP_LOADSERVICE) {
97 record = services->load_service(serviceName.c_str());
99 catch (service_load_exc &slexc) {
100 log(loglevel_t::ERROR, "Could not load service ", slexc.serviceName, ": ", slexc.excDescription);
105 record = services->find_service(serviceName.c_str());
108 if (record != nullptr) {
109 // Allocate a service handle
110 handle_t handle = allocate_service_handle(record);
111 std::vector<char> rp_buf;
113 rp_buf.push_back(DINIT_RP_SERVICERECORD);
114 rp_buf.push_back(static_cast<char>(record->get_state()));
115 for (int i = 0; i < (int) sizeof(handle); i++) {
116 rp_buf.push_back(*(((char *) &handle) + i));
118 rp_buf.push_back(static_cast<char>(record->get_target_state()));
119 if (! queue_packet(std::move(rp_buf))) return false;
122 std::vector<char> rp_buf = { DINIT_RP_NOSERVICE };
123 if (! queue_packet(std::move(rp_buf))) return false;
126 // Clear the packet from the buffer
127 rbuf.consume(chklen);
132 bool control_conn_t::process_start_stop(int pktType)
136 constexpr int pkt_size = 2 + sizeof(handle_t);
138 if (rbuf.get_length() < pkt_size) {
143 // 1 byte: packet type
144 // 1 byte: pin in requested state (0 = no pin, 1 = pin)
145 // 4 bytes: service handle
147 bool do_pin = (rbuf[1] == 1);
149 rbuf.extract((char *) &handle, 2, sizeof(handle));
151 service_record *service = find_service_for_key(handle);
152 if (service == nullptr) {
153 // Service handle is bad
154 char badreqRep[] = { DINIT_RP_BADREQ };
155 if (! queue_packet(badreqRep, 1)) return false;
156 bad_conn_close = true;
157 iob.set_watches(OUT_EVENTS);
161 bool already_there = false;
164 case DINIT_CP_STARTSERVICE:
165 // start service, mark as required
166 if (do_pin) service->pin_start();
168 services->process_queues();
169 already_there = service->get_state() == service_state_t::STARTED;
171 case DINIT_CP_STOPSERVICE:
172 // force service to stop
173 if (do_pin) service->pin_stop();
175 service->forced_stop();
176 services->process_queues();
177 already_there = service->get_state() == service_state_t::STOPPED;
179 case DINIT_CP_WAKESERVICE:
180 // re-start a stopped service (do not mark as required)
181 if (do_pin) service->pin_start();
182 service->start(false);
183 services->process_queues();
184 already_there = service->get_state() == service_state_t::STARTED;
186 case DINIT_CP_RELEASESERVICE:
187 // remove required mark, stop if not required by dependents
188 if (do_pin) service->pin_stop();
189 service->stop(false);
190 services->process_queues();
191 already_there = service->get_state() == service_state_t::STOPPED;
195 char ack_buf[] = { (char)(already_there ? DINIT_RP_ALREADYSS : DINIT_RP_ACK) };
197 if (! queue_packet(ack_buf, 1)) return false;
200 // Clear the packet from the buffer
201 rbuf.consume(pkt_size);
206 bool control_conn_t::process_unpin_service()
210 constexpr int pkt_size = 1 + sizeof(handle_t);
212 if (rbuf.get_length() < pkt_size) {
217 // 1 byte: packet type
218 // 4 bytes: service handle
221 rbuf.extract((char *) &handle, 1, sizeof(handle));
223 service_record *service = find_service_for_key(handle);
224 if (service == nullptr) {
225 // Service handle is bad
226 char badreqRep[] = { DINIT_RP_BADREQ };
227 if (! queue_packet(badreqRep, 1)) return false;
228 bad_conn_close = true;
229 iob.set_watches(OUT_EVENTS);
234 services->process_queues();
235 char ack_buf[] = { (char) DINIT_RP_ACK };
236 if (! queue_packet(ack_buf, 1)) return false;
239 // Clear the packet from the buffer
240 rbuf.consume(pkt_size);
245 bool control_conn_t::list_services()
247 rbuf.consume(1); // clear request packet
251 auto slist = services->list_services();
252 for (auto sptr : slist) {
253 std::vector<char> pkt_buf;
255 const std::string &name = sptr->get_service_name();
256 int nameLen = std::min((size_t)256, name.length());
257 pkt_buf.resize(8 + nameLen);
259 pkt_buf[0] = DINIT_RP_SVCINFO;
260 pkt_buf[1] = nameLen;
261 pkt_buf[2] = static_cast<char>(sptr->get_state());
262 pkt_buf[3] = static_cast<char>(sptr->get_target_state());
264 pkt_buf[4] = 0; // reserved
269 for (int i = 0; i < nameLen; i++) {
270 pkt_buf[8+i] = name[i];
273 if (! queue_packet(std::move(pkt_buf))) return false;
276 char ack_buf[] = { (char) DINIT_RP_LISTDONE };
277 if (! queue_packet(ack_buf, 1)) return false;
281 catch (std::bad_alloc &exc)
288 control_conn_t::handle_t control_conn_t::allocate_service_handle(service_record *record)
290 bool is_unique = true;
291 handle_t largest_seen = 0;
292 handle_t candidate = 0;
293 for (auto p : keyServiceMap) {
294 if (p.first > largest_seen) largest_seen = p.first;
295 if (p.first == candidate) {
296 if (largest_seen == std::numeric_limits<handle_t>::max()) throw std::bad_alloc();
297 candidate = largest_seen + 1;
299 is_unique &= (p.second != record);
302 keyServiceMap[candidate] = record;
303 serviceKeyMap.insert(std::make_pair(record, candidate));
306 record->addListener(this);
312 bool control_conn_t::queue_packet(const char *pkt, unsigned size) noexcept
314 int in_flag = bad_conn_close ? 0 : IN_EVENTS;
315 bool was_empty = outbuf.empty();
317 // If the queue is empty, we can try to write the packet out now rather than queueing it.
318 // If the write is unsuccessful or partial, we queue the remainder.
320 int wr = write(iob.get_watched_fd(), pkt, size);
322 if (errno == EPIPE) {
325 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
326 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
329 // EAGAIN etc: fall through to below
332 if ((unsigned)wr == size) {
334 iob.set_watches(in_flag);
342 // Create a vector out of the (remaining part of the) packet:
344 outbuf.emplace_back(pkt, pkt + size);
345 iob.set_watches(in_flag | OUT_EVENTS);
348 catch (std::bad_alloc &baexc) {
349 // Mark the connection bad, and stop reading further requests
350 bad_conn_close = true;
353 // We can't send out-of-memory response as we already wrote as much as we
354 // could above. Neither can we later send the response since we have currently
355 // sent an incomplete packet. All we can do is close the connection.
359 iob.set_watches(OUT_EVENTS);
365 // This queue_packet method is frustratingly similar to the one above, but the subtle differences
366 // make them extraordinary difficult to combine into a single method.
367 bool control_conn_t::queue_packet(std::vector<char> &&pkt) noexcept
369 int in_flag = bad_conn_close ? 0 : IN_EVENTS;
370 bool was_empty = outbuf.empty();
374 // We can try sending the packet immediately:
375 int wr = write(iob.get_watched_fd(), pkt.data(), pkt.size());
377 if (errno == EPIPE) {
380 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
381 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
384 // EAGAIN etc: fall through to below
387 if ((unsigned)wr == pkt.size()) {
389 iob.set_watches(in_flag);
397 outbuf.emplace_back(pkt);
398 iob.set_watches(in_flag | OUT_EVENTS);
401 catch (std::bad_alloc &baexc) {
402 // Mark the connection bad, and stop reading further requests
403 bad_conn_close = true;
406 // We can't send out-of-memory response as we already wrote as much as we
407 // could above. Neither can we later send the response since we have currently
408 // sent an incomplete packet. All we can do is close the connection.
412 iob.set_watches(OUT_EVENTS);
418 bool control_conn_t::rollback_complete() noexcept
420 char ackBuf[2] = { DINIT_ROLLBACK_COMPLETED, 2 };
421 return queue_packet(ackBuf, 2);
424 bool control_conn_t::data_ready() noexcept
426 int fd = iob.get_watched_fd();
428 int r = rbuf.fill(fd);
430 // Note file descriptor is non-blocking
432 if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
433 log(loglevel_t::WARN, "Error writing to control connection: ", strerror(errno));
444 if (rbuf.get_length() >= chklen) {
446 return !process_packet();
448 catch (std::bad_alloc &baexc) {
453 else if (rbuf.get_length() == 1024) {
455 log(loglevel_t::WARN, "Received too-large control package; dropping connection");
456 bad_conn_close = true;
457 iob.set_watches(OUT_EVENTS);
460 int out_flags = (bad_conn_close || !outbuf.empty()) ? OUT_EVENTS : 0;
461 iob.set_watches(IN_EVENTS | out_flags);
467 bool control_conn_t::send_data() noexcept
469 if (outbuf.empty() && bad_conn_close) {
472 char oomBuf[] = { DINIT_RP_OOM };
473 write(iob.get_watched_fd(), oomBuf, 1);
478 vector<char> & pkt = outbuf.front();
479 char *data = pkt.data();
480 int written = write(iob.get_watched_fd(), data + outpkt_index, pkt.size() - outpkt_index);
482 if (errno == EPIPE) {
486 else if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
487 // spurious readiness notification?
490 log(loglevel_t::ERROR, "Error writing to control connection: ", strerror(errno));
496 outpkt_index += written;
497 if (outpkt_index == pkt.size()) {
498 // We've finished this packet, move on to the next:
501 if (outbuf.empty() && ! oom_close) {
502 if (! bad_conn_close) {
503 iob.set_watches(IN_EVENTS);
514 control_conn_t::~control_conn_t() noexcept
516 close(iob.get_watched_fd());
517 iob.deregister(loop);
519 // Clear service listeners
520 for (auto p : serviceKeyMap) {
521 p.first->removeListener(this);
524 active_control_conns--;